Skip to main content

pict_rs_file_store/
lib.rs

1use std::{
2    path::{Path, PathBuf},
3    sync::Arc,
4};
5
6use bytes::Bytes;
7use pict_rs_error::{ApplicationError, ApplicationResultExt, ErrorCode};
8use pict_rs_futures::{future::WithPollTimer, stream::Stream};
9use pict_rs_store::{Store, StoreError, generate_disk_path};
10
11#[derive(Debug)]
12pub struct FileError;
13
14impl std::fmt::Display for FileError {
15    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
16        f.write_str("Error in file store")
17    }
18}
19
20impl std::error::Error for FileError {}
21
22trait FileResultExt<T, E>
23where
24    E: std::error::Error + Send + Sync + 'static,
25{
26    fn not_found<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
27    where
28        F: FnOnce() -> G,
29        G: std::error::Error + Send + Sync + 'static;
30
31    fn io_error<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
32    where
33        F: FnOnce() -> G,
34        G: std::error::Error + Send + Sync + 'static;
35}
36
37impl<T> FileResultExt<T, std::io::Error> for std::io::Result<T> {
38    fn not_found<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
39    where
40        F: FnOnce() -> G,
41        G: std::error::Error + Send + Sync + 'static,
42    {
43        match self {
44            Ok(t) => Ok(t),
45            Err(e) if matches!(e.kind(), std::io::ErrorKind::NotFound) => {
46                Err(ApplicationError::new(e)
47                    .error_code(ErrorCode::NOT_FOUND)
48                    .raise(err()))
49            }
50            Err(e) => Err(ApplicationError::new(e)
51                .error_code(ErrorCode::FILE_IO_ERROR)
52                .raise(err())),
53        }
54    }
55
56    fn io_error<F, G>(self, err: F) -> Result<T, ApplicationError<G>>
57    where
58        F: FnOnce() -> G,
59        G: std::error::Error + Send + Sync + 'static,
60    {
61        self.or_code(ErrorCode::FILE_IO_ERROR).or_raise(err)
62    }
63}
64
65#[derive(Clone)]
66pub struct FileStore {
67    root_dir: PathBuf,
68}
69
70impl Store for FileStore {
71    #[tracing::instrument(level = "debug", skip(self))]
72    async fn health_check(&self) -> Result<(), ApplicationError<StoreError>> {
73        tokio::fs::metadata(&self.root_dir)
74            .await
75            .not_found(|| StoreError("Failed to check health"))?;
76
77        Ok(())
78    }
79
80    async fn save_stream<S>(
81        &self,
82        stream: S,
83        _content_type: mime::Mime,
84        extension: Option<&str>,
85    ) -> Result<Arc<str>, ApplicationError<StoreError>>
86    where
87        S: Stream<Item = std::io::Result<Bytes>>,
88    {
89        let make_error = || StoreError("Failed to save stream into store");
90
91        let path = self.next_file(extension);
92
93        if let Err(e) = self.safe_save_stream(&path, stream).await {
94            let _ = self.safe_remove_file(&path).await;
95
96            return Err(e.raise((make_error)()));
97        }
98
99        self.file_id_from_path(path).or_raise(make_error)
100    }
101
102    fn public_url(&self, _identifier: &Arc<str>) -> Option<url::Url> {
103        None
104    }
105
106    #[tracing::instrument(skip(self))]
107    fn to_stream(
108        &self,
109        identifier: &Arc<str>,
110        from_start: Option<u64>,
111        len: Option<u64>,
112    ) -> impl Stream<Item = std::io::Result<Bytes>> + use<> + 'static {
113        let path = self.path_from_file_id(identifier);
114
115        pict_rs_futures::fs::read_to_stream(path, from_start, len)
116    }
117
118    #[tracing::instrument(skip(self))]
119    async fn len(&self, identifier: &Arc<str>) -> Result<u64, ApplicationError<StoreError>> {
120        let path = self.path_from_file_id(identifier);
121
122        let len = tokio::fs::metadata(path)
123            .await
124            .not_found(|| StoreError("Failed to get length of identifier"))?
125            .len();
126
127        Ok(len)
128    }
129
130    #[tracing::instrument(skip(self))]
131    async fn remove(&self, identifier: &Arc<str>) -> Result<(), ApplicationError<StoreError>> {
132        let path = self.path_from_file_id(identifier);
133
134        self.safe_remove_file(path)
135            .await
136            .or_raise(|| StoreError("Failed to remove identifier"))?;
137
138        Ok(())
139    }
140}
141
142impl FileStore {
143    pub async fn build(root_dir: PathBuf) -> Result<Self, ApplicationError<StoreError>> {
144        tokio::fs::create_dir_all(&root_dir)
145            .await
146            .io_error(|| StoreError("Failed to prepare file store"))?;
147
148        Ok(FileStore { root_dir })
149    }
150
151    fn file_id_from_path(&self, path: PathBuf) -> Result<Arc<str>, ApplicationError<FileError>> {
152        path.strip_prefix(&self.root_dir)
153            .or_code(ErrorCode::FORMAT_FILE_ID_ERROR)
154            .or_raise(|| FileError)?
155            .to_str()
156            .ok_or_else(|| {
157                ApplicationError::new(FileError).error_code(ErrorCode::FORMAT_FILE_ID_ERROR)
158            })
159            .map(Into::into)
160    }
161
162    fn path_from_file_id(&self, file_id: &Arc<str>) -> PathBuf {
163        self.root_dir.join(file_id.as_ref())
164    }
165
166    fn next_file(&self, extension: Option<&str>) -> PathBuf {
167        generate_disk_path(self.root_dir.clone(), extension)
168    }
169
170    #[tracing::instrument(level = "debug", skip(self, path), fields(path = ?path.as_ref()))]
171    async fn safe_remove_file<P: AsRef<Path>>(
172        &self,
173        path: P,
174    ) -> Result<(), ApplicationError<FileError>> {
175        tokio::fs::remove_file(&path)
176            .await
177            .not_found(|| FileError)?;
178        self.try_remove_parents(path.as_ref()).await?;
179        Ok(())
180    }
181
182    async fn try_remove_parents(&self, mut path: &Path) -> Result<(), ApplicationError<FileError>> {
183        while let Some(parent) = path.parent() {
184            tracing::trace!("try_remove_parents: looping");
185
186            if parent.ends_with(&self.root_dir) {
187                break;
188            }
189
190            match tokio::fs::remove_dir(parent).await {
191                Ok(()) => path = parent,
192                Err(e) if matches!(e.kind(), std::io::ErrorKind::DirectoryNotEmpty) => break,
193                Err(e) => {
194                    return Err(ApplicationError::new(e)
195                        .error_code(ErrorCode::FILE_IO_ERROR)
196                        .raise(FileError));
197                }
198            }
199        }
200
201        Ok(())
202    }
203
204    async fn safe_save_stream<P: AsRef<Path>>(
205        &self,
206        to: P,
207        input: impl Stream<Item = std::io::Result<Bytes>>,
208    ) -> Result<(), ApplicationError<FileError>> {
209        pict_rs_futures::fs::safe_create_parent(&to)
210            .await
211            .io_error(|| FileError)?;
212
213        if let Err(e) = tokio::fs::metadata(&to).await {
214            if e.kind() != std::io::ErrorKind::NotFound {
215                return Err(ApplicationError::new(e)
216                    .error_code(ErrorCode::FILE_IO_ERROR)
217                    .raise(FileError));
218            }
219        } else {
220            return Err(ApplicationError::new(FileError).error_code(ErrorCode::FILE_EXISTS));
221        }
222
223        pict_rs_futures::fs::write_from_stream(to, input)
224            .with_poll_timer("write-from-stream")
225            .await
226            .io_error(|| FileError)?;
227
228        Ok(())
229    }
230}
231
232impl std::fmt::Debug for FileStore {
233    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
234        f.debug_struct("FileStore")
235            .field("root_dir", &self.root_dir)
236            .finish()
237    }
238}