pict_rs_file_store/
lib.rs1use std::{
2 path::{Path, PathBuf},
3 sync::Arc,
4};
5
6use bytes::Bytes;
7use pict_rs_error::{ApplicationError, ApplicationResultExt, ErrorCode};
8use pict_rs_futures::{future::WithPollTimer, stream::Stream};
9use pict_rs_store::{Store, StoreError, generate_disk_path};
10
11#[derive(Debug)]
12pub struct FileError;
13
14impl std::fmt::Display for FileError {
15 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16 f.write_str("Error in file store")
17 }
18}
19
20impl std::error::Error for FileError {}
21
22trait FileResultExt<T, E>
23where
24 E: std::error::Error + Send + Sync + 'static,
25{
26 fn not_found<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
27 where
28 F: FnOnce() -> G,
29 G: std::error::Error + Send + Sync + 'static;
30
31 fn io_error<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
32 where
33 F: FnOnce() -> G,
34 G: std::error::Error + Send + Sync + 'static;
35}
36
37impl<T> FileResultExt<T, std::io::Error> for std::io::Result<T> {
38 fn not_found<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
39 where
40 F: FnOnce() -> G,
41 G: std::error::Error + Send + Sync + 'static,
42 {
43 match self {
44 Ok(t) => Ok(t),
45 Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => {
46 Err(ApplicationError::new(e)
47 .error_code(ErrorCode::NOT_FOUND)
48 .raise(err()))
49 }
50 Err(e) => Err(ApplicationError::new(e)
51 .error_code(ErrorCode::FILE_IO_ERROR)
52 .raise(err())),
53 }
54 }
55
56 fn io_error<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
57 where
58 F: FnOnce() -> G,
59 G: std::error::Error + Send + Sync + 'static,
60 {
61 self.or_code(ErrorCode::FILE_IO_ERROR).or_raise(err)
62 }
63}
64
65#[derive(Clone)]
66pub struct FileStore {
67 root_dir: PathBuf,
68}
69
70impl Store for FileStore {
71 #[tracing::instrument(level = "debug", skip(self))]
72 async fn health_check(&self) -> Result<(), ApplicationError<StoreError>> {
73 tokio::fs::metadata(&self.root_dir)
74 .await
75 .not_found(|| StoreError("Failed to check health"))?;
76
77 Ok(())
78 }
79
80 async fn save_stream<S>(
81 &self,
82 stream: S,
83 _content_type: mime::Mime,
84 extension: Option<&str>,
85 ) -> Result<Arc<str>, ApplicationError<StoreError>>
86 where
87 S: Stream<Item = std::io::Result<Bytes>>,
88 {
89 let make_error = || StoreError("Failed to save stream into store");
90
91 let path = self.next_file(extension);
92
93 if let Err(e) = self.safe_save_stream(&path, stream).await {
94 let _ = self.safe_remove_file(&path).await;
95
96 return Err(e.raise((make_error)()));
97 }
98
99 self.file_id_from_path(path).or_raise(make_error)
100 }
101
102 fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> {
103 None
104 }
105
106 #[tracing::instrument(skip(self))]
107 fn to_stream(
108 &self,
109 identifier: &Arc<str>,
110 from_start: Option<u64>,
111 len: Option<u64>,
112 ) -> impl Stream<Item = std::io::Result<Bytes>> + use<> + 'static {
113 let path = self.path_from_file_id(identifier);
114
115 pict_rs_futures::fs::read_to_stream(path, from_start, len)
116 }
117
118 #[tracing::instrument(skip(self))]
119 async fn len(&self, identifier: &Arc<str>) -> Result<u64, ApplicationError<StoreError>> {
120 let path = self.path_from_file_id(identifier);
121
122 let len = tokio::fs::metadata(path)
123 .await
124 .not_found(|| StoreError("Failed to get length of identifier"))?
125 .len();
126
127 Ok(len)
128 }
129
130 #[tracing::instrument(skip(self))]
131 async fn remove(&self, identifier: &Arc<str>) -> Result<(), ApplicationError<StoreError>> {
132 let path = self.path_from_file_id(identifier);
133
134 self.safe_remove_file(path)
135 .await
136 .or_raise(|| StoreError("Failed to remove identifier"))?;
137
138 Ok(())
139 }
140}
141
142impl FileStore {
143 pub async fn build(root_dir: PathBuf) -> Result<Self, ApplicationError<StoreError>> {
144 tokio::fs::create_dir_all(&root_dir)
145 .await
146 .io_error(|| StoreError("Failed to prepare file store"))?;
147
148 Ok(FileStore { root_dir })
149 }
150
151 fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, ApplicationError<FileError>> {
152 path.strip_prefix(&self.root_dir)
153 .or_code(ErrorCode::FORMAT_FILE_ID_ERROR)
154 .or_raise(|| FileError)?
155 .to_str()
156 .ok_or_else(|| {
157 ApplicationError::new(FileError).error_code(ErrorCode::FORMAT_FILE_ID_ERROR)
158 })
159 .map(Into::into)
160 }
161
162 fn path_from_file_id(&self, file_id: &Arc<str>) -> PathBuf {
163 self.root_dir.join(file_id.as_ref())
164 }
165
166 fn next_file(&self, extension: Option<&str>) -> PathBuf {
167 generate_disk_path(self.root_dir.clone(), extension)
168 }
169
170 #[tracing::instrument(level = "debug", skip(self, path), fields(path = ?path.as_ref()))]
171 async fn safe_remove_file<P: AsRef<Path>>(
172 &self,
173 path: P,
174 ) -> Result<(), ApplicationError<FileError>> {
175 tokio::fs::remove_file(&path)
176 .await
177 .not_found(|| FileError)?;
178 self.try_remove_parents(path.as_ref()).await?;
179 Ok(())
180 }
181
182 async fn try_remove_parents(&self, mut path: &Path) -> Result<(), ApplicationError<FileError>> {
183 while let Some(parent) = path.parent() {
184 tracing::trace!("try_remove_parents: looping");
185
186 if parent.ends_with(&self.root_dir) {
187 break;
188 }
189
190 match tokio::fs::remove_dir(parent).await {
191 Ok(()) => path = parent,
192 Err(e) if matches!(e.kind(), std::io::ErrorKind::DirectoryNotEmpty) => break,
193 Err(e) => {
194 return Err(ApplicationError::new(e)
195 .error_code(ErrorCode::FILE_IO_ERROR)
196 .raise(FileError));
197 }
198 }
199 }
200
201 Ok(())
202 }
203
204 async fn safe_save_stream<P: AsRef<Path>>(
205 &self,
206 to: P,
207 input: impl Stream<Item = std::io::Result<Bytes>>,
208 ) -> Result<(), ApplicationError<FileError>> {
209 pict_rs_futures::fs::safe_create_parent(&to)
210 .await
211 .io_error(|| FileError)?;
212
213 if let Err(e) = tokio::fs::metadata(&to).await {
214 if e.kind() != std::io::ErrorKind::NotFound {
215 return Err(ApplicationError::new(e)
216 .error_code(ErrorCode::FILE_IO_ERROR)
217 .raise(FileError));
218 }
219 } else {
220 return Err(ApplicationError::new(FileError).error_code(ErrorCode::FILE_EXISTS));
221 }
222
223 pict_rs_futures::fs::write_from_stream(to, input)
224 .with_poll_timer("write-from-stream")
225 .await
226 .io_error(|| FileError)?;
227
228 Ok(())
229 }
230}
231
232impl std::fmt::Debug for FileStore {
233 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234 f.debug_struct("FileStore")
235 .field("root_dir", &self.root_dir)
236 .finish()
237 }
238}