feophantlib/engine/io/
index_manager.rs

1//! TODO #24 Fix the index implementation to use the locking layer
2
3// Okay so more thinking, my approach needs to change
4/*
5    lock leaf
6
7    split _ cow
8
9    lock left and right leaves
10
11    check up splitting as we go
12
13    write down, unlocking as we go
14
15*/
16
17use super::block_layer::file_manager2::{FileManager2, FileManager2Error};
18use super::format_traits::Parseable;
19use super::index_formats::{
20    BTreeBranchError, BTreeFirstPage, BTreeFirstPageError, BTreeLeafError, BTreeNode,
21    BTreeNodeError,
22};
23use super::page_formats::PageOffset;
24use super::page_formats::{PageId, PageType};
25use super::row_formats::ItemPointer;
26use crate::engine::io::format_traits::Serializable;
27use crate::engine::io::index_formats::BTreeBranch;
28use crate::engine::objects::{Index, SqlTuple};
29use std::num::TryFromIntError;
30use std::sync::Arc;
31use thiserror::Error;
32
33mod find_leaf;
34use find_leaf::find_leaf;
35use find_leaf::FindLeafError;
36
37mod split_leaf;
38use split_leaf::split_leaf;
39use split_leaf::SplitLeafError;
40
41//TODO Support something other than btrees
42//TODO Support searching on a non primary index column
43
44#[derive(Clone)]
45pub struct IndexManager {
46    file_manager: Arc<FileManager2>,
47}
48
49impl IndexManager {
50    pub fn new(file_manager: Arc<FileManager2>) -> IndexManager {
51        IndexManager { file_manager }
52    }
53
54    pub async fn add(
55        &self,
56        index_def: &Index,
57        new_key: SqlTuple,
58        item_ptr: ItemPointer,
59    ) -> Result<(), IndexManagerError> {
60        let page_id = PageId {
61            resource_key: index_def.id,
62            page_type: PageType::Data,
63        };
64
65        //Find the target leaf
66        let (page_guard, page_offset, mut leaf) =
67            find_leaf(&self.file_manager, index_def, &new_key).await?;
68
69        //If the key fits in the leaf, we add it and are done
70        if leaf.can_fit(&new_key) {
71            leaf.add(new_key, item_ptr)?;
72
73            self.file_manager
74                .update_page(page_guard, leaf.serialize_and_pad())
75                .await?;
76            return Ok(());
77        }
78
79        //Lock the leafs left and right if they exist
80        let left_neighbor = leaf.left_node;
81        let left_page = match left_neighbor {
82            Some(s) => Some(self.file_manager.get_page_for_update(&page_id, &s).await?),
83            None => None,
84        };
85
86        let right_neighbor = leaf.right_node;
87        let right_page = match right_neighbor {
88            Some(s) => Some(self.file_manager.get_page_for_update(&page_id, &s).await?),
89            None => None,
90        };
91
92        //Doesn't fit so we have to split and work back up to the loop
93        let (mut split_key, mut parent_node_offset, new_left_offset, new_right_offset) =
94            split_leaf(&self.file_manager, index_def, leaf, new_key, item_ptr).await?;
95
96        if let Some((mut left_buffer, left_guard)) = left_page {
97            if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut left_buffer, index_def)? {
98                l.right_node = Some(new_left_offset);
99
100                self.file_manager
101                    .update_page(left_guard, l.serialize_and_pad())
102                    .await?;
103            } else {
104                return Err(IndexManagerError::UnexpectedBranch(left_neighbor.unwrap()));
105            }
106        }
107
108        if let Some((mut right_buffer, right_guard)) = right_page {
109            if let BTreeNode::Leaf(mut l) = BTreeNode::parse(&mut right_buffer, index_def)? {
110                l.left_node = Some(new_right_offset);
111                self.file_manager
112                    .update_page(right_guard, l.serialize_and_pad())
113                    .await?;
114            } else {
115                return Err(IndexManagerError::UnexpectedBranch(right_neighbor.unwrap()));
116            }
117        }
118
119        //Now its time to fix the tree
120        loop {
121            let (mut parent_page, parent_guard) = self
122                .file_manager
123                .get_page_for_update(&page_id, &parent_node_offset)
124                .await?;
125            if parent_node_offset == PageOffset(0) {
126                //We've hit the top of the system so we'll have to remake the root page
127                let (new_root_offset, new_root_guard) =
128                    self.file_manager.get_next_offset(&page_id).await?;
129
130                let new_root =
131                    BTreeBranch::new(PageOffset(0), new_left_offset, split_key, new_right_offset);
132
133                self.file_manager
134                    .update_page(new_root_guard, new_root.serialize_and_pad())
135                    .await?;
136
137                let first_page = BTreeFirstPage {
138                    root_offset: new_root_offset,
139                };
140                self.file_manager
141                    .update_page(parent_guard, first_page.serialize_and_pad())
142                    .await?;
143
144                return Ok(());
145            }
146            if let BTreeNode::Branch(mut b) = BTreeNode::parse(&mut parent_page, index_def)? {
147                if b.can_fit(&split_key) {
148                    b.add(new_left_offset, split_key, new_right_offset)?;
149
150                    self.file_manager
151                        .update_page(parent_guard, b.serialize_and_pad())
152                        .await?;
153
154                    return Ok(());
155                } else {
156                    //Need to split the branch and move up a level
157                    let (new_right_offset, new_right_guard) =
158                        self.file_manager.get_next_offset(&page_id).await?;
159
160                    let (middle_key, new_right) =
161                        b.add_and_split(new_left_offset, split_key, new_right_offset)?;
162
163                    self.file_manager
164                        .update_page(new_right_guard, new_right.serialize_and_pad())
165                        .await?;
166
167                    self.file_manager
168                        .update_page(parent_guard, b.serialize_and_pad())
169                        .await?;
170
171                    parent_node_offset = b.parent_node;
172                    split_key = middle_key;
173
174                    continue;
175                }
176            } else {
177                return Err(IndexManagerError::UnexpectedLeaf(parent_node_offset));
178            }
179        }
180    }
181
182    pub async fn search_for_key(
183        &self,
184        index_def: &Index,
185        key: &SqlTuple,
186    ) -> Result<Option<Vec<ItemPointer>>, IndexManagerError> {
187        let page_id = PageId {
188            resource_key: index_def.id,
189            page_type: PageType::Data,
190        };
191
192        debug!("index searching for {:?}", key);
193
194        let (mut first_page, _first_guard) =
195            match self.file_manager.get_page(&page_id, &PageOffset(0)).await {
196                Ok(s) => s,
197                Err(FileManager2Error::PageDoesNotExist(_)) => {
198                    return Ok(None);
199                }
200                Err(e) => {
201                    return Err(IndexManagerError::FileManager2Error(e));
202                }
203            };
204        let first_node = BTreeFirstPage::parse(&mut first_page)?;
205
206        let mut current_offset = first_node.root_offset;
207        loop {
208            let (mut current_page, _current_guard) = self
209                .file_manager
210                .get_page(&page_id, &current_offset)
211                .await?;
212            let current_node = BTreeNode::parse(&mut current_page, index_def)?;
213
214            match current_node {
215                BTreeNode::Branch(b) => {
216                    current_offset = *b.search(key..key)?;
217                    continue;
218                }
219                BTreeNode::Leaf(l) => match l.nodes.get(key) {
220                    Some(s) => return Ok(Some(s.clone())),
221                    None => {
222                        return Ok(None);
223                    }
224                },
225            }
226        }
227    }
228}
229
230#[derive(Debug, Error)]
231pub enum IndexManagerError {
232    #[error(transparent)]
233    BTreeBranchError(#[from] BTreeBranchError),
234    #[error(transparent)]
235    BTreeFirstPageError(#[from] BTreeFirstPageError),
236    #[error(transparent)]
237    BTreeLeafError(#[from] BTreeLeafError),
238    #[error(transparent)]
239    BTreeNodeError(#[from] BTreeNodeError),
240    #[error(
241        "Another process made the root index page first, maybe the developer should make locking."
242    )]
243    ConcurrentCreationError(),
244    #[error(transparent)]
245    FindLeafError(#[from] FindLeafError),
246    #[error("Key too large size: {0}, maybe the developer should fix this.")]
247    KeyTooLarge(usize),
248    #[error(transparent)]
249    FileManager2Error(#[from] FileManager2Error),
250    #[error("Node does not exists {0}")]
251    NoSuchNode(PageOffset),
252    #[error("Node {0} empty")]
253    NodeEmpty(PageOffset),
254    #[error("Parent Node empty")]
255    ParentNodeEmpty(),
256    #[error("Root Node Empty")]
257    RootNodeEmpty(),
258    #[error(transparent)]
259    SplitLeafError(#[from] SplitLeafError),
260    #[error("Unable to search, the stack is empty")]
261    StackEmpty(),
262    #[error(transparent)]
263    TryFromIntError(#[from] TryFromIntError),
264    #[error("Unable to split a node of size {0}")]
265    UnableToSplit(usize),
266    #[error("Unexpect Branch at offset {0}")]
267    UnexpectedBranch(PageOffset),
268    #[error("Unexpect Leaf at offset {0}")]
269    UnexpectedLeaf(PageOffset),
270}
271
272#[cfg(test)]
273mod tests {
274    use std::sync::Arc;
275
276    use tempfile::TempDir;
277    use uuid::Uuid;
278
279    use crate::{
280        constants::Nullable,
281        engine::{
282            io::page_formats::UInt12,
283            objects::{
284                types::{BaseSqlTypes, BaseSqlTypesMapper, SqlTypeDefinition},
285                Attribute,
286            },
287        },
288    };
289
290    use super::*;
291    use log::LevelFilter;
292    use simplelog::{ColorChoice, CombinedLogger, Config, TermLogger, TerminalMode};
293
294    fn get_key_and_ptr(num: usize) -> (SqlTuple, ItemPointer) {
295        (
296            SqlTuple(vec![
297                Some(BaseSqlTypes::Text("test".to_string())),
298                Some(BaseSqlTypes::Integer(num as u32)),
299            ]),
300            ItemPointer::new(PageOffset(num), UInt12::new(0).unwrap()),
301        )
302    }
303
304    #[tokio::test]
305    async fn test_roundtrip() -> Result<(), Box<dyn std::error::Error>> {
306        CombinedLogger::init(vec![TermLogger::new(
307            LevelFilter::Debug,
308            Config::default(),
309            TerminalMode::Mixed,
310            ColorChoice::Auto,
311        )])?;
312
313        let tmp = TempDir::new()?;
314        let tmp_dir = tmp.path().as_os_str().to_os_string();
315
316        let fm = Arc::new(FileManager2::new(tmp_dir)?);
317        let im = IndexManager::new(fm);
318
319        let index = Index {
320            id: Uuid::new_v4(),
321            name: "test".to_string(),
322            columns: Arc::new(SqlTypeDefinition::new(&[
323                Attribute::new(
324                    "foo".to_string(),
325                    BaseSqlTypesMapper::Text,
326                    Nullable::NotNull,
327                    None,
328                ),
329                Attribute::new(
330                    "bar".to_string(),
331                    BaseSqlTypesMapper::Integer,
332                    Nullable::NotNull,
333                    None,
334                ),
335            ])),
336            unique: true,
337        };
338
339        for i in 0..1000 {
340            let (key, ptr) = get_key_and_ptr(i);
341            im.add(&index, key, ptr).await?;
342        }
343
344        let (key, ptr) = get_key_and_ptr(999);
345        assert_eq!(Some(vec![ptr]), im.search_for_key(&index, &key).await?);
346
347        Ok(())
348    }
349}