1use super::block_layer::file_manager2::{FileManager2, FileManager2Error};
18use super::format_traits::Parseable;
19use super::index_formats::{
20 BTreeBranchError, BTreeFirstPage, BTreeFirstPageError, BTreeLeafError, BTreeNode,
21 BTreeNodeError,
22};
23use super::page_formats::PageOffset;
24use super::page_formats::{PageId, PageType};
25use super::row_formats::ItemPointer;
26use crate::engine::io::format_traits::Serializable;
27use crate::engine::io::index_formats::BTreeBranch;
28use crate::engine::objects::{Index, SqlTuple};
29use std::num::TryFromIntError;
30use std::sync::Arc;
31use thiserror::Error;
32
33mod find_leaf;
34use find_leaf::find_leaf;
35use find_leaf::FindLeafError;
36
37mod split_leaf;
38use split_leaf::split_leaf;
39use split_leaf::SplitLeafError;
40
41#[derive(Clone)]
45pub struct IndexManager {
46 file_manager: Arc<FileManager2>,
47}
48
49impl IndexManager {
50 pub fn new(file_manager: Arc<FileManager2>) -> IndexManager {
51 IndexManager { file_manager }
52 }
53
54 pub async fn add(
55 &self,
56 index_def: &Index,
57 new_key: SqlTuple,
58 item_ptr: ItemPointer,
59 ) -> Result<(), IndexManagerError> {
60 let page_id = PageId {
61 resource_key: index_def.id,
62 page_type: PageType::Data,
63 };
64
65 let (page_guard, page_offset, mut leaf) =
67 find_leaf(&self.file_manager, index_def, &new_key).await?;
68
69 if leaf.can_fit(&new_key) {
71 leaf.add(new_key, item_ptr)?;
72
73 self.file_manager
74 .update_page(page_guard, leaf.serialize_and_pad())
75 .await?;
76 return Ok(());
77 }
78
79 let left_neighbor = leaf.left_node;
81 let left_page = match left_neighbor {
82 Some(s) => Some(self.file_manager.get_page_for_update(&page_id, &s).await?),
83 None => None,
84 };
85
86 let right_neighbor = leaf.right_node;
87 let right_page = match right_neighbor {
88 Some(s) => Some(self.file_manager.get_page_for_update(&page_id, &s).await?),
89 None => None,
90 };
91
92 let (mut split_key, mut parent_node_offset, new_left_offset, new_right_offset) =
94 split_leaf(&self.file_manager, index_def, leaf, new_key, item_ptr).await?;
95
96 if let Some((mut left_buffer, left_guard)) = left_page {
97 if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut left_buffer, index_def)? {
98 l.right_node = Some(new_left_offset);
99
100 self.file_manager
101 .update_page(left_guard, l.serialize_and_pad())
102 .await?;
103 } else {
104 return Err(IndexManagerError::UnexpectedBranch(left_neighbor.unwrap()));
105 }
106 }
107
108 if let Some((mut right_buffer, right_guard)) = right_page {
109 if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut right_buffer, index_def)? {
110 l.left_node = Some(new_right_offset);
111 self.file_manager
112 .update_page(right_guard, l.serialize_and_pad())
113 .await?;
114 } else {
115 return Err(IndexManagerError::UnexpectedBranch(right_neighbor.unwrap()));
116 }
117 }
118
119 loop {
121 let (mut parent_page, parent_guard) = self
122 .file_manager
123 .get_page_for_update(&page_id, &parent_node_offset)
124 .await?;
125 if parent_node_offset == PageOffset(0) {
126 let (new_root_offset, new_root_guard) =
128 self.file_manager.get_next_offset(&page_id).await?;
129
130 let new_root =
131 BTreeBranch::new(PageOffset(0), new_left_offset, split_key, new_right_offset);
132
133 self.file_manager
134 .update_page(new_root_guard, new_root.serialize_and_pad())
135 .await?;
136
137 let first_page = BTreeFirstPage {
138 root_offset: new_root_offset,
139 };
140 self.file_manager
141 .update_page(parent_guard, first_page.serialize_and_pad())
142 .await?;
143
144 return Ok(());
145 }
146 if let BTreeNode::Branch(mut b) = BTreeNode::parse(&mut parent_page, index_def)? {
147 if b.can_fit(&split_key) {
148 b.add(new_left_offset, split_key, new_right_offset)?;
149
150 self.file_manager
151 .update_page(parent_guard, b.serialize_and_pad())
152 .await?;
153
154 return Ok(());
155 } else {
156 let (new_right_offset, new_right_guard) =
158 self.file_manager.get_next_offset(&page_id).await?;
159
160 let (middle_key, new_right) =
161 b.add_and_split(new_left_offset, split_key, new_right_offset)?;
162
163 self.file_manager
164 .update_page(new_right_guard, new_right.serialize_and_pad())
165 .await?;
166
167 self.file_manager
168 .update_page(parent_guard, b.serialize_and_pad())
169 .await?;
170
171 parent_node_offset = b.parent_node;
172 split_key = middle_key;
173
174 continue;
175 }
176 } else {
177 return Err(IndexManagerError::UnexpectedLeaf(parent_node_offset));
178 }
179 }
180 }
181
182 pub async fn search_for_key(
183 &self,
184 index_def: &Index,
185 key: &SqlTuple,
186 ) -> Result<Option<Vec<ItemPointer>>, IndexManagerError> {
187 let page_id = PageId {
188 resource_key: index_def.id,
189 page_type: PageType::Data,
190 };
191
192 debug!("index searching for {:?}", key);
193
194 let (mut first_page, _first_guard) =
195 match self.file_manager.get_page(&page_id, &PageOffset(0)).await {
196 Ok(s) => s,
197 Err(FileManager2Error::PageDoesNotExist(_)) => {
198 return Ok(None);
199 }
200 Err(e) => {
201 return Err(IndexManagerError::FileManager2Error(e));
202 }
203 };
204 let first_node = BTreeFirstPage::parse(&mut first_page)?;
205
206 let mut current_offset = first_node.root_offset;
207 loop {
208 let (mut current_page, _current_guard) = self
209 .file_manager
210 .get_page(&page_id, ¤t_offset)
211 .await?;
212 let current_node = BTreeNode::parse(&mut current_page, index_def)?;
213
214 match current_node {
215 BTreeNode::Branch(b) => {
216 current_offset = *b.search(key..key)?;
217 continue;
218 }
219 BTreeNode::Leaf(l) => match l.nodes.get(key) {
220 Some(s) => return Ok(Some(s.clone())),
221 None => {
222 return Ok(None);
223 }
224 },
225 }
226 }
227 }
228}
229
230#[derive(Debug, Error)]
231pub enum IndexManagerError {
232 #[error(transparent)]
233 BTreeBranchError(#[from] BTreeBranchError),
234 #[error(transparent)]
235 BTreeFirstPageError(#[from] BTreeFirstPageError),
236 #[error(transparent)]
237 BTreeLeafError(#[from] BTreeLeafError),
238 #[error(transparent)]
239 BTreeNodeError(#[from] BTreeNodeError),
240 #[error(
241 "Another process made the root index page first, maybe the developer should make locking."
242 )]
243 ConcurrentCreationError(),
244 #[error(transparent)]
245 FindLeafError(#[from] FindLeafError),
246 #[error("Key too large size: {0}, maybe the developer should fix this.")]
247 KeyTooLarge(usize),
248 #[error(transparent)]
249 FileManager2Error(#[from] FileManager2Error),
250 #[error("Node does not exists {0}")]
251 NoSuchNode(PageOffset),
252 #[error("Node {0} empty")]
253 NodeEmpty(PageOffset),
254 #[error("Parent Node empty")]
255 ParentNodeEmpty(),
256 #[error("Root Node Empty")]
257 RootNodeEmpty(),
258 #[error(transparent)]
259 SplitLeafError(#[from] SplitLeafError),
260 #[error("Unable to search, the stack is empty")]
261 StackEmpty(),
262 #[error(transparent)]
263 TryFromIntError(#[from] TryFromIntError),
264 #[error("Unable to split a node of size {0}")]
265 UnableToSplit(usize),
266 #[error("Unexpect Branch at offset {0}")]
267 UnexpectedBranch(PageOffset),
268 #[error("Unexpect Leaf at offset {0}")]
269 UnexpectedLeaf(PageOffset),
270}
271
272#[cfg(test)]
273mod tests {
274 use std::sync::Arc;
275
276 use tempfile::TempDir;
277 use uuid::Uuid;
278
279 use crate::{
280 constants::Nullable,
281 engine::{
282 io::page_formats::UInt12,
283 objects::{
284 types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition},
285 Attribute,
286 },
287 },
288 };
289
290 use super::*;
291 use log::LevelFilter;
292 use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode};
293
294 fn get_key_and_ptr(num: usize) -> (SqlTuple, ItemPointer) {
295 (
296 SqlTuple(vec![
297 Some(BaseSqlTypes::Text("test".to_string())),
298 Some(BaseSqlTypes::Integer(num as u32)),
299 ]),
300 ItemPointer::new(PageOffset(num), UInt12::new(0).unwrap()),
301 )
302 }
303
304 #[tokio::test]
305 async fn test_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
306 CombinedLogger::init(vec![TermLogger::new(
307 LevelFilter::Debug,
308 Config::default(),
309 TerminalMode::Mixed,
310 ColorChoice::Auto,
311 )])?;
312
313 let tmp = TempDir::new()?;
314 let tmp_dir = tmp.path().as_os_str().to_os_string();
315
316 let fm = Arc::new(FileManager2::new(tmp_dir)?);
317 let im = IndexManager::new(fm);
318
319 let index = Index {
320 id: Uuid::new_v4(),
321 name: "test".to_string(),
322 columns: Arc::new(SqlTypeDefinition::new(&[
323 Attribute::new(
324 "foo".to_string(),
325 BaseSqlTypesMapper::Text,
326 Nullable::NotNull,
327 None,
328 ),
329 Attribute::new(
330 "bar".to_string(),
331 BaseSqlTypesMapper::Integer,
332 Nullable::NotNull,
333 None,
334 ),
335 ])),
336 unique: true,
337 };
338
339 for i in 0..1000 {
340 let (key, ptr) = get_key_and_ptr(i);
341 im.add(&index, key, ptr).await?;
342 }
343
344 let (key, ptr) = get_key_and_ptr(999);
345 assert_eq!(Some(vec![ptr]), im.search_for_key(&index, &key).await?);
346
347 Ok(())
348 }
349}