1use crate::{default_resolver, ContentTypeResolver, StorageConfig};
23use remi::{async_trait, Blob, Bytes, Directory, File, ListBlobsRequest, StorageService as _, UploadRequest};
24use std::{
25 borrow::Cow,
26 io,
27 path::{Path, PathBuf},
28 sync::Arc,
29 time::SystemTime,
30};
31use tokio::{fs, io::*};
32
33#[cfg(feature = "tracing")]
34use tracing::instrument;
35
36#[derive(Clone)]
39pub struct StorageService {
40 resolver: Arc<dyn ContentTypeResolver>,
41 config: StorageConfig,
42}
43
44impl StorageService {
45 pub fn new<P: AsRef<Path>>(path: P) -> StorageService {
47 Self::with_config(StorageConfig::new(path))
48 }
49
50 pub fn with_config(config: StorageConfig) -> StorageService {
52 StorageService {
53 resolver: Arc::new(default_resolver),
54 config,
55 }
56 }
57
58 pub fn with_resolver<R: ContentTypeResolver + 'static>(mut self, resolver: R) -> StorageService {
60 self.resolver = Arc::new(resolver);
61 self
62 }
63
64 #[cfg_attr(
72 feature = "tracing",
73 instrument(
74 name = "remi.filesystem.normalize",
75 skip_all,
76 fields(remi.service = "fs", path = %path.as_ref().display())
77 )
78 )]
79 pub fn normalize<P: AsRef<Path>>(&self, path: P) -> io::Result<Option<PathBuf>> {
80 let path = path.as_ref();
81
82 #[cfg(feature = "tracing")]
83 tracing::trace!("resolving path");
84
85 #[cfg(feature = "log")]
86 log::trace!("resolving path: {}", path.display());
87
88 if path == self.config.directory {
89 return std::fs::canonicalize(&self.config.directory).map(|x| Ok(Some(x)))?;
90 }
91
92 if path.starts_with("./") {
93 let Some(directory) = self.normalize(&self.config.directory)? else {
94 #[cfg(feature = "tracing")]
95 tracing::warn!(
96 directory = %self.config.directory.display(),
97 "unable to resolve directory from config"
98 );
99
100 #[cfg(feature = "log")]
101 log::warn!("unable to resolve given directory from config");
102
103 return Ok(None);
104 };
105
106 let normalized = format!("{}/{}", directory.display(), path.strip_prefix("./").unwrap().display());
107
108 #[cfg(feature = "tracing")]
109 tracing::trace!(%normalized, "resolved path to");
110
111 #[cfg(feature = "log")]
112 log::trace!("resolved path {} ~> {normalized}", path.display());
113
114 return Ok(Some(Path::new(&normalized).to_path_buf()));
115 }
116
117 if path.starts_with("~/") {
118 let homedir = etcetera::home_dir()
119 .inspect_err(|e| {
120 #[cfg(feature = "tracing")]
121 tracing::error!(error = %e, "failed to get home directory");
122
123 #[cfg(feature = "log")]
124 log::error!("failed to get home directory: {e}");
125
126 let _ = e;
127 })
128 .map_err(|_| <std::io::ErrorKind as Into<std::io::Error>>::into(io::ErrorKind::InvalidData))?;
129
130 let normalized = format!("{}/{}", homedir.display(), path.strip_prefix("~/").unwrap().display());
131
132 #[cfg(feature = "tracing")]
133 tracing::trace!(%normalized, "resolved path to");
134
135 #[cfg(feature = "log")]
136 log::trace!("resolved path {} ~> {normalized}", path.display());
137
138 return Ok(Some(Path::new(&normalized).to_path_buf()));
139 }
140
141 Ok(Some(path.to_path_buf()))
142 }
143
144 async fn create_file(&self, path: &Path) -> io::Result<File> {
145 let metadata = path.metadata();
146 let is_symlink = metadata.as_ref().map(|m| m.is_symlink()).unwrap_or(false);
147 let size = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
148 let last_modified_at = match metadata {
149 Ok(ref m) => Some(
150 m.modified()?
151 .duration_since(SystemTime::UNIX_EPOCH)
152 .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
153 .as_millis(),
154 ),
155
156 Err(_) => None,
157 };
158
159 let created_at = match metadata {
160 Ok(ref m) => Some(
161 m.created()?
162 .duration_since(SystemTime::UNIX_EPOCH)
163 .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
164 .as_millis(),
165 ),
166
167 Err(_) => None,
168 };
169
170 let bytes = self.open(path).await?.map_or(Bytes::new(), |x| x);
171 let content_type = self.resolver.resolve(bytes.as_ref());
172
173 Ok(File {
174 last_modified_at,
175 content_type: Some(content_type.to_string()),
176 metadata: Default::default(),
177 created_at,
178 is_symlink,
179 data: bytes,
180 name: path.file_name().unwrap().to_string_lossy().into_owned(),
181 path: format!("fs://{}", path.display()),
182 size: size as usize,
183 })
184 }
185
186 async fn create_file_from_entry(&self, path: &Path, entry: fs::DirEntry) -> io::Result<File> {
187 let metadata = entry.metadata().await;
188 let is_symlink = metadata.as_ref().map(|m| m.is_symlink()).unwrap_or(false);
189 let size = metadata.as_ref().map(|m| m.len()).unwrap_or(0);
190 let last_modified_at = match metadata {
191 Ok(ref m) => Some(
192 m.modified()?
193 .duration_since(SystemTime::UNIX_EPOCH)
194 .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
195 .as_millis(),
196 ),
197
198 Err(_) => None,
199 };
200
201 let created_at = match metadata {
202 Ok(ref m) => Some(
203 m.created()?
204 .duration_since(SystemTime::UNIX_EPOCH)
205 .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
206 .as_millis(),
207 ),
208
209 Err(_) => None,
210 };
211
212 let bytes = self.open(path).await?.map_or(Bytes::new(), |x| x);
213 let content_type = self.resolver.resolve(bytes.as_ref());
214
215 Ok(File {
216 last_modified_at,
217 content_type: Some(content_type.to_string()),
218 metadata: Default::default(),
219 created_at,
220 is_symlink,
221 data: bytes,
222 name: entry.file_name().to_string_lossy().into_owned(),
223 path: format!("fs://{}", path.display()),
224 size: size as usize,
225 })
226 }
227}
228
229#[async_trait]
230impl remi::StorageService for StorageService {
231 type Error = io::Error;
232
233 fn name(&self) -> Cow<'static, str> {
234 Cow::Borrowed("remi:gridfs")
235 }
236
237 #[cfg_attr(
238 feature = "tracing",
239 instrument(
240 name = "remi.filesystem.open",
241 skip_all,
242 fields(
243 remi.service = "fs",
244 directory = %self.config.directory.display()
245 )
246 )
247 )]
248 async fn init(&self) -> io::Result<()> {
249 if !self.config.directory.try_exists()? {
250 #[cfg(feature = "tracing")]
251 tracing::info!("creating directory since it doesn't exist");
252
253 #[cfg(feature = "log")]
254 log::info!(
255 "creating directory [{}] since it doesn't exist",
256 self.config.directory.display(),
257 );
258
259 fs::create_dir_all(&self.config.directory).await?;
260 }
261
262 if !self.config.directory.is_dir() {
263 #[cfg(not(no_io_errorkind))]
264 return Err(Error::new(
265 io::ErrorKind::NotADirectory,
266 format!("path [{}] is a file, not a directory", self.config.directory.display()),
267 ));
268
269 #[cfg(no_io_errorkind)]
270 return Err(Error::new(
271 io::ErrorKind::InvalidData,
272 format!("path [{}] is a file, not a directory", self.config.directory.display()),
273 ));
274 }
275
276 Ok(())
277 }
278
279 #[cfg_attr(
280 feature = "tracing",
281 instrument(
282 name = "remi.filesystem.open",
283 skip_all,
284 fields(
285 remi.service = "fs",
286 path = %path.as_ref().display()
287 )
288 )
289 )]
290 async fn open<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<Option<Bytes>> {
291 let path = path.as_ref();
292 let Some(path) = self.normalize(path)? else {
293 #[cfg(feature = "tracing")]
294 tracing::warn!("path given couldn't be normalized");
295
296 #[cfg(feature = "log")]
297 log::warn!("path given [{}] was a file, not a directory", path.display());
298
299 return Ok(None);
300 };
301
302 if !path.try_exists()? {
303 #[cfg(feature = "tracing")]
304 tracing::warn!("path doesn't exist");
305
306 #[cfg(feature = "log")]
307 log::warn!("path [{}] doesn't exist", path.display());
308
309 return Ok(None);
310 }
311
312 if path.is_dir() {
313 #[cfg(not(no_io_errorkind))]
314 return Err(Error::new(
315 io::ErrorKind::NotADirectory,
316 format!("path [{}] is a file, not a directory", self.config.directory.display()),
317 ));
318
319 #[cfg(no_io_errorkind)]
320 return Err(Error::new(
321 io::ErrorKind::InvalidData,
322 format!("path [{}] is a file, not a directory", self.config.directory.display()),
323 ));
324 }
325
326 #[cfg(feature = "tracing")]
327 tracing::trace!("attempting to open file");
328
329 #[cfg(feature = "log")]
330 log::trace!("attempting to open file [{}]", path.display());
331
332 let mut file = fs::OpenOptions::new()
333 .create(false)
334 .write(false)
335 .read(true)
336 .open(&path)
337 .await?;
338
339 let metadata = file.metadata().await?;
340 let size = metadata.len();
341 let mut buffer = vec![0; size as usize];
342
343 buffer.resize(size as usize, 0);
344 file.read_exact(&mut buffer).await?;
345
346 Ok(Some(Bytes::from(buffer)))
347 }
348
349 #[cfg_attr(
350 feature = "tracing",
351 instrument(
352 name = "remi.filesystem.blob",
353 skip_all,
354 fields(
355 remi.service = "fs",
356 path = %path.as_ref().display()
357 )
358 )
359 )]
360 async fn blob<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<Option<Blob>> {
361 let path = path.as_ref();
362 let Some(path) = self.normalize(path)? else {
363 #[cfg(feature = "tracing")]
364 tracing::warn!("path given couldn't be normalized");
365
366 #[cfg(feature = "log")]
367 log::warn!("path given [{}] couldn't be normalized", path.display());
368
369 return Ok(None);
370 };
371
372 if path.is_dir() {
373 let metadata = path.metadata()?;
374 let created_at = match metadata.created() {
375 Ok(sys) => Some(
376 sys.duration_since(SystemTime::UNIX_EPOCH)
377 .map_err(|_| io::Error::new(io::ErrorKind::InvalidData, "clock went backwards?!"))?
378 .as_millis(),
379 ),
380
381 Err(_) => None,
382 };
383
384 let name = path
385 .file_name()
386 .map(|s| s.to_string_lossy())
387 .unwrap_or(Cow::Borrowed("<root or relative path>"))
388 .to_string();
389
390 return Ok(Some(Blob::Directory(Directory {
391 created_at,
392 name,
393 path: format!("fs://{}", path.display()),
394 })));
395 }
396
397 Ok(Some(Blob::File(self.create_file(&path).await?)))
398 }
399
400 #[cfg_attr(
401 feature = "tracing",
402 instrument(
403 name = "remi.filesystem.blobs",
404 skip_all,
405 fields(
406 remi.service = "fs",
407 path = ?path.as_ref().map(|path| path.as_ref().display())
408 )
409 )
410 )]
411 async fn blobs<P: AsRef<Path> + Send>(
412 &self,
413 path: Option<P>,
414 options: Option<ListBlobsRequest>,
415 ) -> io::Result<Vec<Blob>> {
416 let options = options.unwrap_or_default();
417 let prefix = options.prefix.clone().unwrap_or_default();
418 let path = match path {
419 Some(ref p) => p.as_ref(),
420 None => &self.config.directory,
421 };
422
423 let Some(path) = self.normalize(path)? else {
424 #[cfg(feature = "tracing")]
425 tracing::warn!("path given couldn't be normalized");
426
427 #[cfg(feature = "log")]
428 log::warn!("path given [{}] was a file, not a directory", path.display());
429
430 return Ok(vec![]);
431 };
432
433 if path.is_file() {
434 #[cfg(feature = "tracing")]
435 tracing::warn!("path given was a file, not a directory");
436
437 #[cfg(feature = "log")]
438 log::warn!("path given [{}] was a file, not a directory", path.display());
439
440 return Ok(vec![]);
441 }
442
443 let search = format!("{}{prefix}", path.display());
444 #[cfg(feature = "tracing")]
445 tracing::trace!(%search, "attempting to search all blobs in given path");
446
447 #[cfg(feature = "log")]
448 log::trace!(
449 "attempting to search in [{search}] for all blobs in given path [{}]",
450 path.display()
451 );
452
453 let mut files = fs::read_dir(search).await?;
454 let mut blobs = vec![];
455
456 while let Some(entry) = files.next_entry().await? {
457 if entry.path().is_dir() && options.include_dirs {
458 blobs.push(Blob::Directory(Directory {
459 created_at: match entry.metadata().await {
460 Ok(sys) => Some(
461 sys.created()?
462 .duration_since(SystemTime::UNIX_EPOCH)
463 .map_err(|_| io::Error::new(io::ErrorKind::Other, "clock went backwards?!"))?
464 .as_millis(),
465 ),
466
467 Err(_) => None,
468 },
469
470 name: path
471 .file_name()
472 .map(|s| s.to_string_lossy())
473 .unwrap_or(Cow::Borrowed("<root or relative path>"))
474 .to_string(),
475
476 path: format!("fs://{}", entry.path().display()),
477 }));
478
479 continue;
480 }
481
482 let path = entry.path();
483 let ext_allowed = match path.extension() {
484 Some(s) => options.is_ext_allowed(s.to_str().expect("valid utf-8 in path extension")),
485 None => true,
486 };
487
488 if !ext_allowed {
489 continue;
490 }
491
492 blobs.push(Blob::File(self.create_file_from_entry(&path, entry).await?));
493 }
494
495 Ok(blobs)
496 }
497
498 #[cfg_attr(
499 feature = "tracing",
500 instrument(
501 name = "remi.filesystem.delete",
502 skip_all,
503 fields(
504 remi.service = "fs",
505 path = %path.as_ref().display()
506 )
507 )
508 )]
509 async fn delete<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<()> {
510 let path = path.as_ref();
511 let Some(path) = self.normalize(path)? else {
512 return Err(io::Error::new(
513 io::ErrorKind::InvalidInput,
514 "unable to normalize given path",
515 ));
516 };
517
518 if path.is_dir() {
519 #[cfg(feature = "tracing")]
520 tracing::trace!("deleting directory");
521
522 #[cfg(feature = "log")]
523 log::trace!("deleting directory [{}]", path.display());
524
525 fs::remove_dir(path).await?;
526 return Ok(());
527 }
528
529 #[cfg(feature = "tracing")]
530 tracing::trace!("deleting file");
531
532 #[cfg(feature = "log")]
533 log::trace!("deleting file [{}]...", path.display());
534
535 fs::remove_file(path).await
536 }
537
538 #[cfg_attr(
539 feature = "tracing",
540 instrument(
541 name = "remi.filesystem.exists",
542 skip_all,
543 fields(
544 remi.service = "fs",
545 path = %path.as_ref().display()
546 )
547 )
548 )]
549 async fn exists<P: AsRef<Path> + Send>(&self, path: P) -> io::Result<bool> {
550 let path = path.as_ref();
551 let Some(path) = self.normalize(path)? else {
552 return Err(io::Error::new(
553 io::ErrorKind::InvalidInput,
554 "unable to normalize given path",
555 ));
556 };
557
558 path.try_exists()
559 }
560
561 #[cfg_attr(
562 feature = "tracing",
563 instrument(
564 name = "remi.filesystem.upload",
565 skip_all,
566 fields(
567 remi.service = "fs",
568 path = %path.as_ref().display()
569 )
570 )
571 )]
572 async fn upload<P: AsRef<Path> + Send>(&self, path: P, options: UploadRequest) -> io::Result<()> {
573 let path = path.as_ref();
574 let Some(path) = self.normalize(path)? else {
575 return Err(io::Error::new(
576 io::ErrorKind::InvalidInput,
577 "unable to normalize given path",
578 ));
579 };
580
581 if path.try_exists()? {
582 #[cfg(feature = "tracing")]
583 tracing::warn!("contents in given path will be overwritten");
584
585 #[cfg(feature = "log")]
586 log::trace!("contents in given path [{}] will be overwritten", path.display());
587 }
588
589 #[cfg(feature = "tracing")]
590 tracing::warn!("uploading file");
591
592 #[cfg(feature = "log")]
593 log::trace!("uploading file [{}]", path.display());
594
595 if let Some(parent) = path.parent() {
598 fs::create_dir_all(parent).await?;
599 }
600
601 let mut file = fs::OpenOptions::new();
602 file.write(true);
603
604 if !path.try_exists()? {
605 file.create_new(true);
607 }
608
609 let mut file = file.open(path).await?;
610 file.write_all(options.data.as_ref()).await?;
611 file.flush().await?;
612
613 Ok(())
614 }
615
616 #[cfg(feature = "unstable")]
617 #[cfg_attr(any(noeldoc, docsrs), doc(cfg(feature = "unstable")))]
618 async fn healthcheck(&self) -> io::Result<()> {
619 Ok(())
620 }
621}
622
623#[cfg(test)]
624mod tests {
625 use super::*;
626
627 macro_rules! build_testcases {
629 ($(
630 $(#[$meta:meta])*
631 $name:ident($storage:ident) $code:block
632 )*) => {
633 $(
634 $(#[$meta])*
635 #[::tokio::test]
636 async fn $name() -> ::std::io::Result<()> {
637 let tempdir = ::tempfile::tempdir().expect("failed to create tempdir");
638 assert!(fs::try_exists(&tempdir).await.expect("tempdir to actually exist"));
639
640 let $storage = $crate::StorageService::new(&tempdir);
641 ($storage).init().await.expect("initialization part to be successful");
642
643 assert!(fs::try_exists(tempdir).await.expect("should actually exist?!"));
644
645 let __ret: ::std::io::Result<()> = $code;
646 __ret
647 }
648 )*
649 };
650 }
651
652 build_testcases! {
653 init(_storage) {
654 Ok(())
655 }
656
657 }
680}