1use bytes::{Bytes, BytesMut};
2use moka::future::Cache;
3use std::convert::TryFrom;
4use std::io::SeekFrom;
5use std::num::TryFromIntError;
6use std::ops::DerefMut;
7use std::{
8 ffi::{OsStr, OsString},
9 path::{Path, PathBuf},
10 sync::{atomic::AtomicUsize, Arc},
11};
12use thiserror::Error;
13use tokio::fs::{read_dir, File};
14use tokio::io::{AsyncReadExt, AsyncSeekExt};
15use tokio::sync::{Mutex, OwnedRwLockReadGuard, OwnedRwLockWriteGuard};
16
17use crate::constants::{MAX_FILE_HANDLE_COUNT, MAX_PAGE_CACHE};
18use crate::engine::io::block_layer::ResourceFormatter;
19use crate::{
20 constants::PAGE_SIZE,
21 engine::io::page_formats::{PageId, PageOffset},
22};
23
24use super::file_operations::{FileOperations, FileOperationsError};
25use super::lock_manager::LockManager;
26
27const EMPTY_BUFFER: [u8; 16] = [0u8; 16];
29
30pub struct FileManager2 {
34 data_dir: PathBuf,
35 file_handles: Cache<(PageId, usize), Arc<Mutex<File>>>,
36 file_offsets: Cache<PageId, Arc<AtomicUsize>>,
37 lock_manager: LockManager,
38 page_cache: Cache<(PageId, PageOffset), Bytes>,
39}
40
41impl FileManager2 {
42 pub fn new(raw_path: OsString) -> Result<FileManager2, FileManager2Error> {
43 let data_dir = Path::new(&raw_path).to_path_buf();
44
45 if !data_dir.is_dir() {
46 return Err(FileManager2Error::NeedDirectory(
47 data_dir.to_string_lossy().to_string(),
48 ));
49 }
50
51 Ok(FileManager2 {
52 data_dir,
53 file_handles: Cache::new(MAX_FILE_HANDLE_COUNT),
54 file_offsets: Cache::new(10000),
55 lock_manager: LockManager::new(),
56 page_cache: Cache::new(MAX_PAGE_CACHE),
57 })
58 }
59
60 pub async fn get_next_offset(
61 &self,
62 page_id: &PageId,
63 ) -> Result<(PageOffset, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> {
64 let data_dir = self.data_dir.clone();
65 let page_id = *page_id;
66 let current_offset = self
67 .file_offsets
68 .get_or_try_insert_with(page_id, async move {
69 let po = Self::find_next_offset(&data_dir, &page_id).await?;
70 let start_atomic: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(po.0));
71 Ok::<Arc<AtomicUsize>, FileManager2Error>(start_atomic)
72 })
73 .await?;
74 let new_offset = current_offset.fetch_add(1, std::sync::atomic::Ordering::SeqCst);
75 let new_po = PageOffset(new_offset);
76
77 let write_lock = self.lock_manager.write(page_id, new_po).await;
78 Ok((new_po, write_lock))
79 }
80
81 pub async fn add_page(
82 &self,
83 guard: OwnedRwLockWriteGuard<(PageId, PageOffset)>,
84 page: Bytes,
85 ) -> Result<(), FileManager2Error> {
86 let data_dir = self.data_dir.clone();
87 let page_id = guard.0;
88 let file_number = guard.1.get_file_number();
89 let file_handle = self
90 .file_handles
91 .get_or_try_insert_with((page_id, file_number), async move {
92 let handle = FileOperations::open_path(&data_dir, &page_id, file_number).await?;
93 Ok::<Arc<Mutex<File>>, FileManager2Error>(Arc::new(Mutex::const_new(handle)))
94 })
95 .await?;
96 let mut file = file_handle.lock().await;
97
98 self.page_cache
99 .insert((page_id, guard.1), page.clone())
100 .await;
101 let _ = FileOperations::add_chunk(file.deref_mut(), &guard.1, page).await?;
102 Ok(())
103 }
104
105 pub async fn get_page(
106 &self,
107 page_id: &PageId,
108 offset: &PageOffset,
109 ) -> Result<(Bytes, OwnedRwLockReadGuard<(PageId, PageOffset)>), FileManager2Error> {
110 let read_lock = self.lock_manager.read(*page_id, *offset).await;
111
112 let data_dir = self.data_dir.clone();
113 let page_id = *page_id;
114 let offset = *offset;
115 let file_number = offset.get_file_number();
116 let file_handles = self.file_handles.clone();
117
118 let chunk = match self
119 .page_cache
120 .get_or_try_insert_with((page_id, offset), async move {
121 let file_handle = file_handles
122 .get_or_try_insert_with((page_id, file_number), async move {
123 let handle =
124 FileOperations::open_path(&data_dir, &page_id, file_number).await?;
125 Ok::<Arc<Mutex<File>>, FileOperationsError>(Arc::new(Mutex::const_new(
126 handle,
127 )))
128 })
129 .await?;
130 let mut file = file_handle.lock().await;
131
132 let chunk = FileOperations::read_chunk(file.deref_mut(), &offset).await?;
133 Ok::<Bytes, FileOperationsError>(chunk)
134 })
135 .await
136 {
137 Ok(s) => s,
138 Err(e) => {
139 if let FileOperationsError::FileTooSmall(_, _) = e.as_ref() {
140 return Err(FileManager2Error::PageDoesNotExist(offset));
141 } else {
142 return Err(FileManager2Error::ArcFileOperationsError(e));
143 }
144 }
145 };
146
147 Ok((chunk, read_lock))
148 }
149
150 pub async fn get_page_for_update(
151 &self,
152 page_id: &PageId,
153 offset: &PageOffset,
154 ) -> Result<(Bytes, OwnedRwLockWriteGuard<(PageId, PageOffset)>), FileManager2Error> {
155 let write_lock = self.lock_manager.write(*page_id, *offset).await;
156
157 let data_dir = self.data_dir.clone();
158 let page_id = *page_id;
159 let offset = *offset;
160 let file_number = offset.get_file_number();
161 let file_handles = self.file_handles.clone();
162
163 let chunk = match self
164 .page_cache
165 .get_or_try_insert_with((page_id, offset), async move {
166 let file_handle = file_handles
167 .get_or_try_insert_with((page_id, file_number), async move {
168 let handle =
169 FileOperations::open_path(&data_dir, &page_id, file_number).await?;
170 Ok::<Arc<Mutex<File>>, FileOperationsError>(Arc::new(Mutex::const_new(
171 handle,
172 )))
173 })
174 .await?;
175 let mut file = file_handle.lock().await;
176
177 let chunk = FileOperations::read_chunk(file.deref_mut(), &offset).await?;
178 Ok::<Bytes, FileOperationsError>(chunk)
179 })
180 .await
181 {
182 Ok(s) => s,
183 Err(e) => {
184 if let FileOperationsError::FileTooSmall(_, _) = e.as_ref() {
185 return Err(FileManager2Error::PageDoesNotExist(offset));
186 } else {
187 return Err(FileManager2Error::ArcFileOperationsError(e));
188 }
189 }
190 };
191
192 Ok((chunk, write_lock))
193 }
194
195 pub async fn update_page(
196 &self,
197 guard: OwnedRwLockWriteGuard<(PageId, PageOffset)>,
198 page: Bytes,
199 ) -> Result<(), FileManager2Error> {
200 let data_dir = self.data_dir.clone();
201 let page_id = guard.0;
202 let file_number = guard.1.get_file_number();
203 let file_handle = self
204 .file_handles
205 .get_or_try_insert_with((page_id, file_number), async move {
206 let handle = FileOperations::open_path(&data_dir, &page_id, file_number).await?;
207 Ok::<Arc<Mutex<File>>, FileManager2Error>(Arc::new(Mutex::const_new(handle)))
208 })
209 .await?;
210 let mut file = file_handle.lock().await;
211
212 self.page_cache
213 .insert((page_id, guard.1), page.clone())
214 .await;
215 let _ = FileOperations::update_chunk(file.deref_mut(), &guard.1, page).await?;
216 Ok(())
217 }
218
219 async fn find_next_offset(
220 data_dir: &Path,
221 page_id: &PageId,
222 ) -> Result<PageOffset, FileManager2Error> {
223 let (path, count) = match Self::search_for_max_file(data_dir, page_id).await? {
224 Some((p, c)) => (p, c),
225 None => {
226 return Ok(PageOffset(0));
227 }
228 };
229
230 let mut file = File::open(path.clone()).await?;
231 let file_meta = file.metadata().await?;
232 let file_len = file_meta.len();
233
234 if file_len % PAGE_SIZE as u64 != 0 {
235 return Err(FileManager2Error::IncorrectPageSize(file_len, path));
236 }
237
238 let file_len = usize::try_from(file_len)?;
241
242 let mut in_file_len = file_len;
244 while in_file_len != 0 {
245 in_file_len = file_len.saturating_sub(PAGE_SIZE as usize);
247
248 let in_file_len_u64 = u64::try_from(in_file_len)?;
249 file.seek(SeekFrom::Start(in_file_len_u64)).await?;
250
251 let mut buffer = BytesMut::with_capacity(EMPTY_BUFFER.len());
253 file.read_buf(&mut buffer).await?;
254 let buffer = buffer.freeze();
255 if buffer == Bytes::from_static(&EMPTY_BUFFER) {
256 continue;
258 } else {
259 in_file_len = file_len.saturating_add(PAGE_SIZE as usize);
261 let po = PageOffset::calculate_page_offset(count, in_file_len);
262 return Ok(po);
263 }
264 }
265
266 let po = PageOffset::calculate_page_offset(count, in_file_len);
268 Ok(po)
269 }
270
271 async fn search_for_max_file(
273 data_dir: &Path,
274 page_id: &PageId,
275 ) -> Result<Option<(PathBuf, usize)>, FileManager2Error> {
276 let sub_path = FileOperations::make_sub_path(data_dir, page_id).await?;
277 let target_uuid = ResourceFormatter::format_uuid(&page_id.resource_key);
278 let target_type = page_id.page_type.to_string();
279 let target_filename = format!("{0}.{1}", target_uuid, target_type);
280
281 let mut max_file_count = 0;
282 let mut max_file_path = None;
283
284 let mut files = read_dir(sub_path).await?;
285 while let Some(entry) = files.next_entry().await? {
286 let path = entry.path();
287 let file_stem = match path.file_stem() {
288 Some(s) => Self::format_os_string(s),
289 None => {
290 continue;
291 }
292 };
293 let file_ext = match path.extension() {
294 Some(s) => Self::format_os_string(s),
295 None => {
296 continue;
297 }
298 };
299 if !file_stem.eq(&target_filename) {
300 continue;
301 }
302 let file_count = match file_ext.parse::<usize>() {
303 Ok(s) => s,
304 Err(_) => {
305 continue;
306 }
307 };
308
309 if file_count >= max_file_count {
310 max_file_count = file_count;
311 max_file_path = Some(path);
312 }
313 }
314
315 match max_file_path {
316 Some(s) => Ok(Some((s, max_file_count))),
317 None => Ok(None),
318 }
319 }
320
321 fn format_os_string(input: &OsStr) -> String {
322 input.to_ascii_lowercase().to_string_lossy().into_owned()
323 }
324}
325
326#[derive(Debug, Error)]
327pub enum FileManager2Error {
328 #[error(transparent)]
329 FileManager2Error(#[from] Arc<FileManager2Error>),
330 #[error(transparent)]
331 FileOperationsError(#[from] FileOperationsError),
332 #[error(transparent)]
333 ArcFileOperationsError(#[from] Arc<FileOperationsError>),
334 #[error("Incorrect page size of {0} on file {1} found. System cannot function")]
335 IncorrectPageSize(u64, PathBuf),
336 #[error(transparent)]
337 IOError(#[from] std::io::Error),
338 #[error("Page {0} does not exist")]
339 PageDoesNotExist(PageOffset),
340 #[error("Need a directory to store the data. Got ({0}) may be stripped of non Unicode chars.")]
341 NeedDirectory(String),
342 #[error(transparent)]
343 TryFromIntError(#[from] TryFromIntError),
344}
345
346#[cfg(test)]
347mod tests {
348 use bytes::{Bytes, BytesMut};
349 use tempfile::TempDir;
350 use uuid::Uuid;
351
352 use crate::{constants::PAGE_SIZE, engine::io::page_formats::PageType};
353
354 use super::*;
355
356 fn get_test_page(fill: u8) -> Bytes {
357 let mut test_page = BytesMut::with_capacity(PAGE_SIZE as usize);
358 let free_space = vec![fill; PAGE_SIZE as usize];
359 test_page.extend_from_slice(&free_space);
360 test_page.freeze()
361 }
362
363 #[tokio::test]
364 async fn test_roundtrips() -> Result<(), Box<dyn std::error::Error>> {
365 let tmp = TempDir::new()?;
366 let tmp_dir = tmp.path();
367
368 let fm = FileManager2::new(tmp_dir.as_os_str().to_os_string())?;
369
370 let page_id = PageId {
371 resource_key: Uuid::new_v4(),
372 page_type: PageType::Data,
373 };
374
375 let test_page = get_test_page(1);
376 let (test_po, test_guard) = fm.get_next_offset(&page_id).await?;
377 fm.add_page(test_guard, test_page.clone()).await?;
378
379 assert_eq!(test_po, PageOffset(0));
380
381 let (test_page_get, test_guard) = fm.get_page_for_update(&page_id, &test_po).await?;
382 assert_eq!(test_page, test_page_get);
383
384 let test_page2 = get_test_page(2);
385 fm.update_page(test_guard, test_page2.clone()).await?;
386
387 let (test_page_get2, _test_page_guard2) = fm.get_page(&page_id, &test_po).await?;
388 assert_eq!(test_page2, test_page_get2);
389
390 let fm2 = FileManager2::new(tmp_dir.as_os_str().to_os_string())?;
391 let test_page3 = get_test_page(3);
392 let (test_po3, test_guard3) = fm2.get_next_offset(&page_id).await?;
393 fm2.add_page(test_guard3, test_page3.clone()).await?;
394 assert!(test_po3 > test_po);
395
396 let (test_page_get2, _test_guard2) = fm2.get_page(&page_id, &test_po).await?;
397 assert_eq!(test_page2, test_page_get2);
398
399 Ok(())
400 }
401}