feophantlib/engine/io/block_layer/
file_operations.rs1use bytes::{Bytes, BytesMut};
4use std::convert::TryFrom;
5use std::num::TryFromIntError;
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::{io::SeekFrom, path::Path};
9use thiserror::Error;
10use tokio::fs;
11use tokio::io::{AsyncReadExt, AsyncWriteExt};
12use tokio::{
13 fs::{File, OpenOptions},
14 io::AsyncSeekExt,
15};
16
17use crate::constants::PAGE_SIZE;
18use crate::engine::io::block_layer::ResourceFormatter;
19use crate::engine::io::page_formats::{PageId, PageOffset};
20pub struct FileOperations {}
21
22impl FileOperations {
23 pub async fn open_path(
24 data_dir: &Path,
25 page_id: &PageId,
26 file_number: usize,
27 ) -> Result<File, FileOperationsError> {
28 let mut path = Self::make_sub_path(data_dir, page_id).await?;
29 let file_stem = ResourceFormatter::format_uuid(&page_id.resource_key);
30 let file_type = page_id.page_type.to_string();
31 let filename = format!("{0}.{1}.{2}", file_stem, file_type, file_number);
32
33 path.push(filename);
34
35 Ok(OpenOptions::new()
36 .read(true)
37 .write(true)
38 .create(true)
39 .open(path)
40 .await?)
41 }
42
43 pub async fn add_chunk(
46 file: &mut File,
47 page_offset: &PageOffset,
48 buffer: Bytes,
49 ) -> Result<(), FileOperationsError> {
50 let metadata = file.metadata().await?;
51 let chunk_size_u64 = u64::try_from(page_offset.get_file_chunk_size())?;
52
53 if metadata.len() < chunk_size_u64 {
54 file.set_len(chunk_size_u64).await?;
55 }
56
57 Self::update_chunk(file, page_offset, buffer).await?;
58 Ok(())
59 }
60
61 pub async fn make_sub_path(
63 data_dir: &Path,
64 page_id: &PageId,
65 ) -> Result<PathBuf, FileOperationsError> {
66 let subfolder = ResourceFormatter::get_uuid_prefix(&page_id.resource_key);
67
68 let mut path = PathBuf::new();
69 path.push(data_dir);
70 path.push(subfolder);
71
72 fs::create_dir_all(path.as_path()).await?;
73 Ok(path)
74 }
75
76 pub async fn read_chunk(
77 file: &mut File,
78 page_offset: &PageOffset,
79 ) -> Result<Bytes, FileOperationsError> {
80 let mut buffer = BytesMut::with_capacity(PAGE_SIZE as usize);
81
82 let file_meta = file.metadata().await?;
83
84 let file_len = file_meta.len();
85 if u64::try_from(page_offset.get_file_chunk_size())? > file_len {
86 return Err(FileOperationsError::FileTooSmall(
87 page_offset.get_file_chunk_size(),
88 file_len,
89 ));
90 }
91
92 file.seek(SeekFrom::Start(u64::try_from(page_offset.get_file_seek())?))
93 .await?;
94
95 while buffer.len() != PAGE_SIZE as usize {
96 let readamt = file.read_buf(&mut buffer).await?;
97 if readamt == 0 {
98 return Err(FileOperationsError::IncompleteRead(readamt, buffer.len()));
99 }
100 }
101
102 Ok(buffer.freeze())
103 }
104
105 pub async fn update_chunk(
106 file: &mut File,
107 page_offset: &PageOffset,
108 mut buffer: Bytes,
109 ) -> Result<(), FileOperationsError> {
110 file.seek(SeekFrom::Start(u64::try_from(page_offset.get_file_seek())?))
111 .await?;
112
113 file.write_all_buf(&mut buffer).await?;
114
115 Ok(())
118 }
119}
120
121#[derive(Debug, Error)]
122pub enum FileOperationsError {
123 #[error(transparent)]
124 FileOperationsError(#[from] Arc<FileOperationsError>),
125 #[error("Read {0} bytes instead of a page, the buffer has {1}")]
126 IncompleteRead(usize, usize),
127 #[error(transparent)]
128 IOError(#[from] std::io::Error),
129 #[error(transparent)]
130 TryFromIntError(#[from] TryFromIntError),
131 #[error("File too small for requested read {0}, size is {1}")]
132 FileTooSmall(usize, u64),
133}
134
135#[cfg(test)]
136mod tests {
137 use tempfile::TempDir;
138 use uuid::Uuid;
139
140 use crate::engine::io::page_formats::PageType;
141
142 use super::*;
143
144 #[tokio::test]
145 async fn test_make_sub_path() -> Result<(), Box<dyn std::error::Error>> {
146 let tmp = TempDir::new()?;
147
148 let page_id = PageId {
149 resource_key: Uuid::new_v4(),
150 page_type: PageType::Data,
151 };
152
153 FileOperations::make_sub_path(tmp.path(), &page_id).await?;
155 FileOperations::make_sub_path(tmp.path(), &page_id).await?;
156
157 Ok(())
158 }
159}