feophantlib/engine/io/block_layer/
free_space_manager.rs

1//! This struct provides a lookup service to tell row_manager where / if there is free space to
2//! add new tuples. It is designed to be extremely space efficent since it only uses 1 bit per
3//! page to say the space is availible. This means each page here can cover 134MB of free space.
4
5use super::{
6    super::page_formats::{PageId, PageOffset, PageType},
7    file_manager2::{FileManager2, FileManager2Error},
8};
9use crate::constants::PAGE_SIZE;
10use bytes::{Buf, BytesMut};
11use std::sync::Arc;
12use thiserror::Error;
13
14#[derive(Clone)]
15pub struct FreeSpaceManager {
16    file_manager: Arc<FileManager2>,
17}
18
19impl FreeSpaceManager {
20    pub fn new(file_manager: Arc<FileManager2>) -> FreeSpaceManager {
21        FreeSpaceManager { file_manager }
22    }
23
24    pub async fn get_next_free_page(
25        &self,
26        page_id: PageId,
27    ) -> Result<PageOffset, FreeSpaceManagerError> {
28        let mut offset = PageOffset(0);
29        let free_id = PageId {
30            resource_key: page_id.resource_key,
31            page_type: PageType::FreeSpaceMap,
32        };
33        loop {
34            match self.file_manager.get_page(&free_id, &offset).await {
35                Ok((s, _read_guard)) => {
36                    let mut page_frozen = s.clone();
37                    match Self::find_first_free_page_in_page(&mut page_frozen) {
38                        Some(s) => {
39                            let full_offset = PageOffset(s)
40                                + offset * PageOffset(PAGE_SIZE as usize) * PageOffset(8);
41                            return Ok(full_offset);
42                        }
43                        None => {
44                            offset += PageOffset(1);
45                            continue;
46                        }
47                    }
48                }
49                Err(_) => {
50                    // Create the next offset page and loop again as a test.
51                    // Note: due to possible timing issues the next page might not be sequentially
52                    // next so we will check again on the next loop
53
54                    let (_, next_guard) = self.file_manager.get_next_offset(&free_id).await?;
55
56                    let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
57                    let new_page = vec![FreeStat::Free as u8; PAGE_SIZE as usize];
58                    buffer.extend_from_slice(&new_page);
59
60                    self.file_manager
61                        .add_page(next_guard, buffer.freeze())
62                        .await?;
63                }
64            }
65        }
66    }
67
68    pub async fn mark_page(
69        &self,
70        page_id: PageId,
71        po: PageOffset,
72        status: FreeStat,
73    ) -> Result<(), FreeSpaceManagerError> {
74        let free_id = PageId {
75            resource_key: page_id.resource_key,
76            page_type: PageType::FreeSpaceMap,
77        };
78        let (fs_po, inner_offset) = po.get_bitmask_offset();
79
80        let (page, page_guard) = self
81            .file_manager
82            .get_page_for_update(&free_id, &fs_po)
83            .await?;
84
85        let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
86        buffer.extend_from_slice(&page);
87
88        Self::set_status_inside_page(&mut buffer, inner_offset, status);
89
90        self.file_manager
91            .update_page(page_guard, buffer.freeze())
92            .await?;
93
94        Ok(())
95    }
96
97    fn find_first_free_page_in_page(buffer: &mut impl Buf) -> Option<usize> {
98        let mut i = 0;
99        while buffer.has_remaining() {
100            let mut val = buffer.get_u8();
101            if val == 0xFF {
102                i += 1;
103                continue;
104            }
105            for j in 0..8 {
106                if val & 0x1 == 0x0 {
107                    return Some(i * 8 + j);
108                }
109                val >>= 1;
110            }
111            i += 1;
112        }
113        None
114    }
115
116    /// Sets the status of a field inside a page, you MUST pass an offset
117    /// that fits in the buffer.
118    fn set_status_inside_page(buffer: &mut BytesMut, offset: usize, status: FreeStat) {
119        let offset_index = offset / 8;
120        let offset_subindex = offset % 8;
121
122        let current_value = buffer[offset_index];
123        let mut pre_load = 0x1 << offset_subindex;
124        let new_value;
125        match status {
126            FreeStat::Free => {
127                pre_load = !pre_load;
128                new_value = current_value & pre_load;
129            }
130            FreeStat::Full => {
131                new_value = current_value | pre_load;
132            }
133        }
134
135        buffer[offset_index] = new_value;
136    }
137}
138
139#[derive(Copy, Clone, Debug, PartialEq)]
140pub enum FreeStat {
141    Free = 0,
142    Full = 1,
143}
144
145#[derive(Debug, Error)]
146pub enum FreeSpaceManagerError {
147    #[error(transparent)]
148    FileManager2Error(#[from] FileManager2Error),
149}
150
151#[cfg(test)]
152mod tests {
153    use bytes::BufMut;
154    use std::sync::Arc;
155    use tempfile::TempDir;
156    use uuid::Uuid;
157
158    use super::*;
159
160    /// Gets the status of a field inside a page, you MUST pass an offset
161    /// that fits in the buffer.
162    //This was in the implementation, I just only needed it for unit tests
163    fn get_status_inside_page(buffer: &BytesMut, offset: usize) -> FreeStat {
164        let offset_index = offset / 8;
165        let offset_subindex = offset % 8;
166
167        let offset_value = buffer[offset_index];
168        let bit_value = (offset_value >> offset_subindex) & 0x1;
169        if bit_value == 0 {
170            FreeStat::Free
171        } else {
172            FreeStat::Full
173        }
174    }
175
176    ///This test works by toggling each bit repeatedly and making sure it gives the correct result each time.
177    #[test]
178    fn test_get_and_set() -> Result<(), Box<dyn std::error::Error>> {
179        let mut test = BytesMut::with_capacity(2);
180        test.put_u16(0x0);
181
182        for i in 0..test.len() * 8 {
183            assert_eq!(get_status_inside_page(&test, i), FreeStat::Free);
184            FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Full);
185            assert_eq!(get_status_inside_page(&test, i), FreeStat::Full);
186            FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Free);
187            assert_eq!(get_status_inside_page(&test, i), FreeStat::Free);
188        }
189
190        Ok(())
191    }
192
193    #[test]
194    fn test_find_and_fill_pages() -> Result<(), Box<dyn std::error::Error>> {
195        let mut test = BytesMut::with_capacity(2);
196        test.put_u8(0x0);
197        test.put_u8(0x0);
198
199        for i in 0..test.len() * 8 {
200            let free_page = FreeSpaceManager::find_first_free_page_in_page(&mut test.clone());
201            assert_eq!(free_page, Some(i));
202
203            FreeSpaceManager::set_status_inside_page(&mut test, i, FreeStat::Full);
204        }
205        assert_eq!(
206            FreeSpaceManager::find_first_free_page_in_page(&mut test),
207            None
208        );
209
210        Ok(())
211    }
212
213    #[tokio::test]
214    async fn test_get_next() -> Result<(), Box<dyn std::error::Error>> {
215        let tmp = TempDir::new()?;
216        let tmp_dir = tmp.path().as_os_str().to_os_string();
217
218        let fm = Arc::new(FileManager2::new(tmp_dir)?);
219        let fsm = FreeSpaceManager::new(fm);
220
221        let page_id = PageId {
222            resource_key: Uuid::new_v4(),
223            page_type: PageType::Data,
224        };
225
226        let first_free = fsm.get_next_free_page(page_id).await?;
227        assert_eq!(first_free, PageOffset(0));
228
229        fsm.mark_page(page_id, first_free, FreeStat::Full).await?;
230
231        let second_free = fsm.get_next_free_page(page_id).await?;
232        assert_eq!(second_free, PageOffset(1));
233
234        Ok(())
235    }
236}