feophantlib/engine/io/block_layer/
file_manager2.rs

1use bytes::{Bytes, BytesMut};
2use moka::future::Cache;
3use std::convert::TryFrom;
4use std::io::SeekFrom;
5use std::num::TryFromIntError;
6use std::ops::DerefMut;
7use std::{
8    ffi::{OsStr, OsString},
9    path::{Path, PathBuf},
10    sync::{atomic::AtomicUsize, Arc},
11};
12use thiserror::Error;
13use tokio::fs::{read_dir, File};
14use tokio::io::{AsyncReadExt, AsyncSeekExt};
15use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard};
16
17use crate::constants::{MAX_FILE_HANDLE_COUNT, MAX_PAGE_CACHE};
18use crate::engine::io::block_layer::ResourceFormatter;
19use crate::{
20    constants::PAGE_SIZE,
21    engine::io::page_formats::{PageId, PageOffset},
22};
23
24use super::file_operations::{FileOperations, FileOperationsError};
25use super::lock_manager::LockManager;
26
27/// Empty page buffer
28const EMPTY_BUFFER: [u8; 16] = [0u8; 16];
29
30/// Attempt to move away from channels for the FileManager Service.
31///
32/// This code has ended up tremendously simpler than the prior version!
33pub struct FileManager2 {
34    data_dir: PathBuf,
35    file_handles: Cache<(PageId, usize), Arc<Mutex<File>>>,
36    file_offsets: Cache<PageId, Arc<AtomicUsize>>,
37    lock_manager: LockManager,
38    page_cache: Cache<(PageId, PageOffset), Bytes>,
39}
40
41impl FileManager2 {
42    pub fn new(raw_path: OsString) -> Result<FileManager2, FileManager2Error> {
43        let data_dir = Path::new(&raw_path).to_path_buf();
44
45        if !data_dir.is_dir() {
46            return Err(FileManager2Error::NeedDirectory(
47                data_dir.to_string_lossy().to_string(),
48            ));
49        }
50
51        Ok(FileManager2 {
52            data_dir,
53            file_handles: Cache::new(MAX_FILE_HANDLE_COUNT),
54            file_offsets: Cache::new(10000),
55            lock_manager: LockManager::new(),
56            page_cache: Cache::new(MAX_PAGE_CACHE),
57        })
58    }
59
60    pub async fn get_next_offset(
61        &self,
62        page_id: &PageId,
63    ) -> Result<(PageOffset, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> {
64        let data_dir = self.data_dir.clone();
65        let page_id = *page_id;
66        let current_offset = self
67            .file_offsets
68            .get_or_try_insert_with(page_id, async move {
69                let po = Self::find_next_offset(&data_dir, &page_id).await?;
70                let start_atomic: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(po.0));
71                Ok::<Arc<AtomicUsize>, FileManager2Error>(start_atomic)
72            })
73            .await?;
74        let new_offset = current_offset.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
75        let new_po = PageOffset(new_offset);
76
77        let write_lock = self.lock_manager.write(page_id, new_po).await;
78        Ok((new_po, write_lock))
79    }
80
81    pub async fn add_page(
82        &self,
83        guard: OwnedRwLockWriteGuard<(PageId, PageOffset)>,
84        page: Bytes,
85    ) -> Result<(), FileManager2Error> {
86        let data_dir = self.data_dir.clone();
87        let page_id = guard.0;
88        let file_number = guard.1.get_file_number();
89        let file_handle = self
90            .file_handles
91            .get_or_try_insert_with((page_id, file_number), async move {
92                let handle = FileOperations::open_path(&data_dir, &page_id, file_number).await?;
93                Ok::<Arc<Mutex<File>>, FileManager2Error>(Arc::new(Mutex::const_new(handle)))
94            })
95            .await?;
96        let mut file = file_handle.lock().await;
97
98        self.page_cache
99            .insert((page_id, guard.1), page.clone())
100            .await;
101        let _ = FileOperations::add_chunk(file.deref_mut(), &guard.1, page).await?;
102        Ok(())
103    }
104
105    pub async fn get_page(
106        &self,
107        page_id: &PageId,
108        offset: &PageOffset,
109    ) -> Result<(Bytes, OwnedRwLockReadGuard<(PageId, PageOffset)>), FileManager2Error> {
110        let read_lock = self.lock_manager.read(*page_id, *offset).await;
111
112        let data_dir = self.data_dir.clone();
113        let page_id = *page_id;
114        let offset = *offset;
115        let file_number = offset.get_file_number();
116        let file_handles = self.file_handles.clone();
117
118        let chunk = match self
119            .page_cache
120            .get_or_try_insert_with((page_id, offset), async move {
121                let file_handle = file_handles
122                    .get_or_try_insert_with((page_id, file_number), async move {
123                        let handle =
124                            FileOperations::open_path(&data_dir, &page_id, file_number).await?;
125                        Ok::<Arc<Mutex<File>>, FileOperationsError>(Arc::new(Mutex::const_new(
126                            handle,
127                        )))
128                    })
129                    .await?;
130                let mut file = file_handle.lock().await;
131
132                let chunk = FileOperations::read_chunk(file.deref_mut(), &offset).await?;
133                Ok::<Bytes, FileOperationsError>(chunk)
134            })
135            .await
136        {
137            Ok(s) => s,
138            Err(e) => {
139                if let FileOperationsError::FileTooSmall(_, _) = e.as_ref() {
140                    return Err(FileManager2Error::PageDoesNotExist(offset));
141                } else {
142                    return Err(FileManager2Error::ArcFileOperationsError(e));
143                }
144            }
145        };
146
147        Ok((chunk, read_lock))
148    }
149
150    pub async fn get_page_for_update(
151        &self,
152        page_id: &PageId,
153        offset: &PageOffset,
154    ) -> Result<(Bytes, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> {
155        let write_lock = self.lock_manager.write(*page_id, *offset).await;
156
157        let data_dir = self.data_dir.clone();
158        let page_id = *page_id;
159        let offset = *offset;
160        let file_number = offset.get_file_number();
161        let file_handles = self.file_handles.clone();
162
163        let chunk = match self
164            .page_cache
165            .get_or_try_insert_with((page_id, offset), async move {
166                let file_handle = file_handles
167                    .get_or_try_insert_with((page_id, file_number), async move {
168                        let handle =
169                            FileOperations::open_path(&data_dir, &page_id, file_number).await?;
170                        Ok::<Arc<Mutex<File>>, FileOperationsError>(Arc::new(Mutex::const_new(
171                            handle,
172                        )))
173                    })
174                    .await?;
175                let mut file = file_handle.lock().await;
176
177                let chunk = FileOperations::read_chunk(file.deref_mut(), &offset).await?;
178                Ok::<Bytes, FileOperationsError>(chunk)
179            })
180            .await
181        {
182            Ok(s) => s,
183            Err(e) => {
184                if let FileOperationsError::FileTooSmall(_, _) = e.as_ref() {
185                    return Err(FileManager2Error::PageDoesNotExist(offset));
186                } else {
187                    return Err(FileManager2Error::ArcFileOperationsError(e));
188                }
189            }
190        };
191
192        Ok((chunk, write_lock))
193    }
194
195    pub async fn update_page(
196        &self,
197        guard: OwnedRwLockWriteGuard<(PageId, PageOffset)>,
198        page: Bytes,
199    ) -> Result<(), FileManager2Error> {
200        let data_dir = self.data_dir.clone();
201        let page_id = guard.0;
202        let file_number = guard.1.get_file_number();
203        let file_handle = self
204            .file_handles
205            .get_or_try_insert_with((page_id, file_number), async move {
206                let handle = FileOperations::open_path(&data_dir, &page_id, file_number).await?;
207                Ok::<Arc<Mutex<File>>, FileManager2Error>(Arc::new(Mutex::const_new(handle)))
208            })
209            .await?;
210        let mut file = file_handle.lock().await;
211
212        self.page_cache
213            .insert((page_id, guard.1), page.clone())
214            .await;
215        let _ = FileOperations::update_chunk(file.deref_mut(), &guard.1, page).await?;
216        Ok(())
217    }
218
219    async fn find_next_offset(
220        data_dir: &Path,
221        page_id: &PageId,
222    ) -> Result<PageOffset, FileManager2Error> {
223        let (path, count) = match Self::search_for_max_file(data_dir, page_id).await? {
224            Some((p, c)) => (p, c),
225            None => {
226                return Ok(PageOffset(0));
227            }
228        };
229
230        let mut file = File::open(path.clone()).await?;
231        let file_meta = file.metadata().await?;
232        let file_len = file_meta.len();
233
234        if file_len % PAGE_SIZE as u64 != 0 {
235            return Err(FileManager2Error::IncorrectPageSize(file_len, path));
236        }
237
238        // If this fails you are probably on a 32bit platform and
239        // have changed the PAGE_SIZE constant. I would reduce PAGE_SIZE.
240        let file_len = usize::try_from(file_len)?;
241
242        //Now we need to scan backwards in the file to make sure we find the last non-zero page.
243        let mut in_file_len = file_len;
244        while in_file_len != 0 {
245            //Move back to test a block
246            in_file_len = file_len.saturating_sub(PAGE_SIZE as usize);
247
248            let in_file_len_u64 = u64::try_from(in_file_len)?;
249            file.seek(SeekFrom::Start(in_file_len_u64)).await?;
250
251            //Each page should start with a non-zero number within the first 16 bytes, if it has data
252            let mut buffer = BytesMut::with_capacity(EMPTY_BUFFER.len());
253            file.read_buf(&mut buffer).await?;
254            let buffer = buffer.freeze();
255            if buffer == Bytes::from_static(&EMPTY_BUFFER) {
256                //Okay we keep going
257                continue;
258            } else {
259                //We can calucate our page offset now
260                in_file_len = file_len.saturating_add(PAGE_SIZE as usize);
261                let po = PageOffset::calculate_page_offset(count, in_file_len);
262                return Ok(po);
263            }
264        }
265
266        //Okay so the file is empty
267        let po = PageOffset::calculate_page_offset(count, in_file_len);
268        Ok(po)
269    }
270
271    /// This will search for the highest numbered file for the Uuid
272    async fn search_for_max_file(
273        data_dir: &Path,
274        page_id: &PageId,
275    ) -> Result<Option<(PathBuf, usize)>, FileManager2Error> {
276        let sub_path = FileOperations::make_sub_path(data_dir, page_id).await?;
277        let target_uuid = ResourceFormatter::format_uuid(&page_id.resource_key);
278        let target_type = page_id.page_type.to_string();
279        let target_filename = format!("{0}.{1}", target_uuid, target_type);
280
281        let mut max_file_count = 0;
282        let mut max_file_path = None;
283
284        let mut files = read_dir(sub_path).await?;
285        while let Some(entry) = files.next_entry().await? {
286            let path = entry.path();
287            let file_stem = match path.file_stem() {
288                Some(s) => Self::format_os_string(s),
289                None => {
290                    continue;
291                }
292            };
293            let file_ext = match path.extension() {
294                Some(s) => Self::format_os_string(s),
295                None => {
296                    continue;
297                }
298            };
299            if !file_stem.eq(&target_filename) {
300                continue;
301            }
302            let file_count = match file_ext.parse::<usize>() {
303                Ok(s) => s,
304                Err(_) => {
305                    continue;
306                }
307            };
308
309            if file_count >= max_file_count {
310                max_file_count = file_count;
311                max_file_path = Some(path);
312            }
313        }
314
315        match max_file_path {
316            Some(s) => Ok(Some((s, max_file_count))),
317            None => Ok(None),
318        }
319    }
320
321    fn format_os_string(input: &OsStr) -> String {
322        input.to_ascii_lowercase().to_string_lossy().into_owned()
323    }
324}
325
326#[derive(Debug, Error)]
327pub enum FileManager2Error {
328    #[error(transparent)]
329    FileManager2Error(#[from] Arc<FileManager2Error>),
330    #[error(transparent)]
331    FileOperationsError(#[from] FileOperationsError),
332    #[error(transparent)]
333    ArcFileOperationsError(#[from] Arc<FileOperationsError>),
334    #[error("Incorrect page size of {0} on file {1} found. System cannot function")]
335    IncorrectPageSize(u64, PathBuf),
336    #[error(transparent)]
337    IOError(#[from] std::io::Error),
338    #[error("Page {0} does not exist")]
339    PageDoesNotExist(PageOffset),
340    #[error("Need a directory to store the data. Got ({0}) may be stripped of non Unicode chars.")]
341    NeedDirectory(String),
342    #[error(transparent)]
343    TryFromIntError(#[from] TryFromIntError),
344}
345
346#[cfg(test)]
347mod tests {
348    use bytes::{Bytes, BytesMut};
349    use tempfile::TempDir;
350    use uuid::Uuid;
351
352    use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType};
353
354    use super::*;
355
356    fn get_test_page(fill: u8) -> Bytes {
357        let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize);
358        let free_space = vec![fill; PAGE_SIZE as usize];
359        test_page.extend_from_slice(&free_space);
360        test_page.freeze()
361    }
362
363    #[tokio::test]
364    async fn test_roundtrips() -> Result<(), Box<dyn std::error::Error>> {
365        let tmp = TempDir::new()?;
366        let tmp_dir = tmp.path();
367
368        let fm = FileManager2::new(tmp_dir.as_os_str().to_os_string())?;
369
370        let page_id = PageId {
371            resource_key: Uuid::new_v4(),
372            page_type: PageType::Data,
373        };
374
375        let test_page = get_test_page(1);
376        let (test_po, test_guard) = fm.get_next_offset(&page_id).await?;
377        fm.add_page(test_guard, test_page.clone()).await?;
378
379        assert_eq!(test_po, PageOffset(0));
380
381        let (test_page_get, test_guard) = fm.get_page_for_update(&page_id, &test_po).await?;
382        assert_eq!(test_page, test_page_get);
383
384        let test_page2 = get_test_page(2);
385        fm.update_page(test_guard, test_page2.clone()).await?;
386
387        let (test_page_get2, _test_page_guard2) = fm.get_page(&page_id, &test_po).await?;
388        assert_eq!(test_page2, test_page_get2);
389
390        let fm2 = FileManager2::new(tmp_dir.as_os_str().to_os_string())?;
391        let test_page3 = get_test_page(3);
392        let (test_po3, test_guard3) = fm2.get_next_offset(&page_id).await?;
393        fm2.add_page(test_guard3, test_page3.clone()).await?;
394        assert!(test_po3 > test_po);
395
396        let (test_page_get2, _test_guard2) = fm2.get_page(&page_id, &test_po).await?;
397        assert_eq!(test_page2, test_page_get2);
398
399        Ok(())
400    }
401}