goods_treasury/
treasury.rs

1use {
2    crate::asset::Asset,
3    parking_lot::Mutex,
4    std::{io::Read, path::Path, sync::Arc, time::SystemTime},
5    uuid::Uuid,
6};
7
8#[cfg(feature = "import")]
9use crate::import::Importers;
10
11/// Storage for goods.
12pub struct Treasury {
13    registry: Arc<Mutex<Registry>>,
14}
15
16#[derive(PartialEq, Hash)]
17struct Kind {
18    source_path: Arc<Path>,
19    source_format: Arc<str>,
20    native_format: Arc<str>,
21}
22
23pub(crate) struct Registry {
24    /// All paths not suffixed with `_absolute` are relative to this.
25    root: Box<Path>,
26
27    // Data loaded from `root.join(".treasury/db")`.
28    data: Data,
29
30    /// Importers
31    #[cfg(feature = "import")]
32    importers: Importers,
33}
34
35#[derive(serde::Serialize, serde::Deserialize)]
36struct Data {
37    importers_dirs: Vec<Box<Path>>,
38    /// Array with all registered assets.
39    assets: Vec<Asset>,
40}
41
42pub struct AssetData {
43    pub bytes: Box<[u8]>,
44    pub version: u64,
45}
46
47#[derive(Debug, thiserror::Error)]
48pub enum NewError {
49    #[error("Goods path '{path}' is occupied")]
50    GoodsAlreadyExist { path: Box<Path> },
51
52    #[error("Goods path '{path}' is invalid")]
53    InvalidGoodsPath { path: Box<Path> },
54
55    #[error("Failed to create root directory '{path}'")]
56    RootDirCreateError {
57        path: Box<Path>,
58        source: std::io::Error,
59    },
60
61    #[error("Failed to create goods directory '{path}'")]
62    GoodsDirCreateError {
63        path: Box<Path>,
64        source: std::io::Error,
65    },
66
67    #[error("Root '{path}' is not a directory")]
68    RootIsNotDir { path: Box<Path> },
69
70    #[error(transparent)]
71    SaveError(#[from] SaveError),
72}
73
74#[derive(Debug, thiserror::Error)]
75pub enum SaveError {
76    #[error("Failed to open goods path '{path}'")]
77    GoodsOpenError {
78        path: Box<Path>,
79        source: std::io::Error,
80    },
81
82    #[error("Failed to deserialize goods file")]
83    JsonError {
84        path: Box<Path>,
85        source: serde_json::Error,
86    },
87}
88
89#[derive(Debug, thiserror::Error)]
90pub enum OpenError {
91    #[error("Goods path '{path}' is invalid")]
92    InvalidGoodsPath { path: Box<Path> },
93
94    #[error("Failed to create directory for goods path '{path}'")]
95    GoodsCreateError {
96        path: Box<Path>,
97        source: std::io::Error,
98    },
99
100    #[error("Failed to open goods path '{path}'")]
101    GoodsOpenError {
102        path: Box<Path>,
103        source: std::io::Error,
104    },
105
106    #[error("Failed to deserialize goods file")]
107    JsonError {
108        path: Box<Path>,
109        source: serde_json::Error,
110    },
111}
112
113#[derive(Debug, thiserror::Error)]
114pub enum FetchError {
115    #[error("Asset not found")]
116    NotFound,
117
118    #[error("Failed to access native file '{path}'")]
119    NativeIoError {
120        path: Box<Path>,
121        source: std::io::Error,
122    },
123}
124
125#[derive(Debug, thiserror::Error)]
126pub enum StoreError {
127    #[error("No importer from '{source_format}' to '{native_format}' found")]
128    ImporterNotFound {
129        source_format: String,
130        native_format: String,
131    },
132
133    #[error("Import failed")]
134    ImportError { source: eyre::Report },
135
136    #[error("Failed to access source file '{path}'")]
137    SourceIoError {
138        path: Box<Path>,
139        source: std::io::Error,
140    },
141
142    #[error("Failed to access native file '{path}'")]
143    NativeIoError {
144        path: Box<Path>,
145        source: std::io::Error,
146    },
147}
148
149impl Treasury {
150    /// Create new goods storage.
151    #[tracing::instrument(fields(root = %root.as_ref().display()))]
152    pub fn new(root: impl AsRef<Path>, overwrite: bool) -> Result<Self, NewError> {
153        let root = root.as_ref();
154
155        if !root.exists() {
156            std::fs::create_dir_all(&root).map_err(|source| NewError::RootDirCreateError {
157                source,
158                path: root.into(),
159            })?;
160        } else if !root.is_dir() {
161            return Err(NewError::RootIsNotDir { path: root.into() });
162        }
163
164        let treasury_path = root.join(".treasury");
165
166        if treasury_path.exists() {
167            if treasury_path.is_file() {
168                return Err(NewError::GoodsAlreadyExist {
169                    path: treasury_path.into(),
170                });
171            }
172
173            let manifest_path = treasury_path.join("manifest.json");
174            if !overwrite && manifest_path.exists() {
175                return Err(NewError::GoodsAlreadyExist {
176                    path: treasury_path.into(),
177                });
178            }
179        } else if let Err(err) = std::fs::create_dir(&treasury_path) {
180            return Err(NewError::GoodsDirCreateError {
181                source: err,
182                path: treasury_path.into(),
183            });
184        }
185
186        let goods = Treasury {
187            registry: Arc::new(Mutex::new(Registry {
188                #[cfg(feature = "import")]
189                importers: Importers::new(&root),
190                root: root.into(),
191                data: Data {
192                    assets: Vec::new(),
193                    importers_dirs: Vec::new(),
194                },
195            })),
196        };
197
198        Ok(goods)
199    }
200
201    /// Opens goods storage from metadata file.
202    #[tracing::instrument(skip(root), fields(root = %root.as_ref().display()))]
203    pub fn open(root: impl AsRef<Path>) -> Result<Self, OpenError> {
204        let root = root.as_ref();
205
206        let treasury_path = root.join(".treasury");
207        let manifest_path = treasury_path.join("manifest.json");
208
209        let file =
210            std::fs::File::open(&manifest_path).map_err(|source| OpenError::GoodsOpenError {
211                source,
212                path: manifest_path.clone().into(),
213            })?;
214
215        let data: Data = serde_json::from_reader(file).map_err(|source| OpenError::JsonError {
216            source,
217            path: manifest_path.clone().into(),
218        })?;
219
220        let registry = Arc::new(Mutex::new(Registry {
221            #[cfg(feature = "import")]
222            importers: Importers::new(&root),
223            data,
224            root: root.into(),
225        }));
226
227        #[cfg(feature = "import")]
228        {
229            let registry_clone = registry.clone();
230
231            let mut lock = registry.lock();
232            let me = &mut *lock;
233
234            for dir_path in &me.data.importers_dirs {
235                let root_dir_path = me.root.join(dir_path);
236                if let Err(err) = me
237                    .importers
238                    .load_importers_dir(&root_dir_path, &registry_clone)
239                {
240                    tracing::error!(
241                        "Failed to load importers from '{} ({})'. {:#}",
242                        dir_path.display(),
243                        root_dir_path.display(),
244                        err
245                    );
246                }
247            }
248            drop(lock);
249        }
250
251        Ok(Treasury { registry })
252    }
253
254    pub fn save(&self) -> Result<(), SaveError> {
255        Registry::save(&self.registry)
256    }
257
258    #[cfg(feature = "import")]
259    pub fn load_importers_dir(&mut self, dir_path: impl AsRef<Path>) -> std::io::Result<()> {
260        let dir_path = dir_path.as_ref();
261
262        if self
263            .registry
264            .lock()
265            .data
266            .importers_dirs
267            .iter()
268            .any(|d| **d == *dir_path)
269        {
270            Ok(())
271        } else {
272            let registry_clone = self.registry.clone();
273
274            let mut lock = self.registry.lock();
275
276            match lock.importers.load_importers_dir(dir_path, &registry_clone) {
277                Ok(()) => {
278                    let dir_path = relative_to(dir_path, &lock.root);
279
280                    lock.data
281                        .importers_dirs
282                        .push(dir_path.into_owned().into_boxed_path());
283                    Ok(())
284                }
285                Err(err) => {
286                    tracing::error!(
287                        "Failed to load importers from '{}'. {:#}",
288                        dir_path.display(),
289                        err
290                    );
291                    Err(err)
292                }
293            }
294        }
295    }
296
297    /// Import asset into goods instance
298    #[cfg(feature = "import")]
299    pub fn store(
300        &self,
301        source: impl AsRef<Path>,
302        source_format: &str,
303        native_format: &str,
304        tags: &[impl AsRef<str>],
305    ) -> Result<Uuid, StoreError> {
306        Registry::store(
307            &self.registry,
308            source.as_ref(),
309            source_format,
310            native_format,
311            tags,
312        )
313    }
314
315    /// Fetches asset in native format.
316    /// Performs conversion if native format is absent or out of date.
317    #[tracing::instrument(skip(self))]
318    pub fn fetch(&mut self, uuid: &Uuid) -> Result<AssetData, FetchError> {
319        match Registry::fetch(&self.registry, uuid, 0)? {
320            None => unreachable!(),
321            Some(mut info) => {
322                let mut bytes = Vec::new();
323                info.native_file.read_to_end(&mut bytes).map_err(|source| {
324                    FetchError::NativeIoError {
325                        source,
326                        path: info.native_path.to_path_buf().into(),
327                    }
328                })?;
329
330                Ok(AssetData {
331                    bytes: bytes.into_boxed_slice(),
332                    version: info.version,
333                })
334            }
335        }
336    }
337
338    /// Fetches asset in native format.
339    /// Returns `Ok(None)` if native file is up-to-date.
340    /// Performs conversion if native format is absent or out of date.
341    #[tracing::instrument(skip(self))]
342    pub fn fetch_updated(
343        &mut self,
344        uuid: &Uuid,
345        version: u64,
346    ) -> Result<Option<AssetData>, FetchError> {
347        match Registry::fetch(&self.registry, uuid, version + 1)? {
348            None => Ok(None),
349            Some(mut info) => {
350                let mut bytes = Vec::new();
351                info.native_file.read_to_end(&mut bytes).map_err(|source| {
352                    FetchError::NativeIoError {
353                        source,
354                        path: info.native_path.to_path_buf().into(),
355                    }
356                })?;
357
358                Ok(Some(AssetData {
359                    bytes: bytes.into_boxed_slice(),
360                    version: info.version,
361                }))
362            }
363        }
364    }
365
366    /// Returns assets information.
367    #[tracing::instrument(skip(self, tags))]
368    pub fn list(&self, tags: &[impl AsRef<str>], native_format: Option<&str>) -> Vec<Asset> {
369        let lock = self.registry.lock();
370
371        lock.data
372            .assets
373            .iter()
374            .filter(|a| {
375                if let Some(native_format) = native_format {
376                    if a.native_format() != native_format {
377                        return false;
378                    }
379                }
380
381                tags.iter().all(|tag| {
382                    let tag = tag.as_ref();
383                    a.tags().iter().any(|t| **t == *tag)
384                })
385            })
386            .cloned()
387            .collect()
388    }
389
390    /// Returns assets information.
391    #[tracing::instrument(skip(self))]
392    pub fn remove<'a>(&self, uuid: Uuid) {
393        let mut lock = self.registry.lock();
394
395        if let Some(index) = lock.data.assets.iter().position(|a| a.uuid() == uuid) {
396            let native = Path::new(".treasury").join(uuid.to_hyphenated().to_string());
397            let native_absolute = lock.root.join(native);
398            if let Err(err) = std::fs::remove_file(&native_absolute) {
399                tracing::error!(
400                    "Failed to remove native asset file '{}': {}",
401                    native_absolute.display(),
402                    err
403                );
404            }
405            lock.data.assets.remove(index);
406        }
407    }
408}
409
410pub(crate) struct FetchInfo {
411    pub native_path: Box<Path>,
412    pub native_file: std::fs::File,
413    pub version: u64,
414}
415
416impl Registry {
417    fn save(me: &Mutex<Self>) -> Result<(), SaveError> {
418        let lock = me.lock();
419        let treasury_path = lock.root.join(".treasury").join("manifest.json");
420        let file =
421            std::fs::File::create(&treasury_path).map_err(|source| SaveError::GoodsOpenError {
422                source,
423                path: treasury_path.clone().into(),
424            })?;
425        serde_json::to_writer_pretty(file, &lock.data).map_err(|source| SaveError::JsonError {
426            source,
427            path: treasury_path.into(),
428        })
429    }
430
431    #[cfg(feature = "import")]
432    pub(crate) fn store(
433        me: &Mutex<Self>,
434        source: &Path,
435        source_format: &str,
436        native_format: &str,
437        tags: &[impl AsRef<str>],
438    ) -> Result<Uuid, StoreError> {
439        let mut lock = me.lock();
440
441        // Find the source
442
443        let source_absolute;
444        let source_from_root = if source.is_absolute() {
445            source_absolute = source.to_path_buf();
446            relative_to(source, &lock.root)
447        } else {
448            let cd = std::env::current_dir().map_err(|_| StoreError::SourceIoError {
449                path: source.into(),
450                source: std::io::ErrorKind::NotFound.into(),
451            })?;
452            source_absolute = cd.join(source);
453            relative_to(&source_absolute, &lock.root)
454        };
455
456        if let Some(asset) = lock.data.assets.iter().find(|a| {
457            *a.source() == *source_from_root
458                && a.source_format() == source_format
459                && a.native_format() == native_format
460        }) {
461            tracing::trace!("Already imported");
462
463            let uuid = asset.uuid();
464            drop(lock);
465
466            if let Err(err) = Self::fetch(me, &uuid, 0) {
467                tracing::error!(
468                    "Failed to reimport while storing already known asset. {:#}",
469                    err
470                );
471            }
472            return Ok(uuid);
473        }
474
475        tracing::debug!(
476            "Importing {} as {} @ {}",
477            source_format,
478            native_format,
479            source.display()
480        );
481
482        let uuid = loop {
483            let uuid = Uuid::new_v4();
484            if !lock.data.assets.iter().any(|a| a.uuid() == uuid) {
485                break uuid;
486            }
487        };
488
489        let native_path = Path::new(".treasury").join(uuid.to_hyphenated().to_string());
490        let native_path_absolute = lock.root.join(&native_path);
491
492        if source_format == native_format {
493            if let Err(err) = std::fs::copy(&source, &native_path_absolute) {
494                return Err(StoreError::SourceIoError {
495                    source: err,
496                    path: source.into(),
497                });
498            }
499        } else {
500            match lock.importers.get_importer(source_format, native_format) {
501                None => {
502                    return Err(StoreError::ImporterNotFound {
503                        source_format: source_format.to_owned(),
504                        native_format: native_format.to_owned(),
505                    })
506                }
507                Some(importer_entry) => {
508                    tracing::trace!("Importer found. {}", importer_entry.name());
509
510                    let native_tmp_path = native_path.with_extension("tmp");
511
512                    let result = importer_entry.import(&source_absolute, &native_tmp_path, lock);
513
514                    if let Err(err) = result {
515                        return Err(StoreError::ImportError { source: err });
516                    }
517
518                    tracing::trace!("Imported successfully");
519
520                    let native_tmp_path_absolute = native_path_absolute.with_extension("tmp");
521                    if let Err(err) =
522                        std::fs::rename(&native_tmp_path_absolute, &native_path_absolute)
523                    {
524                        tracing::error!(
525                            "Failed to rename '{}' to '{}'",
526                            native_tmp_path_absolute.display(),
527                            native_path_absolute.display(),
528                        );
529
530                        return Err(StoreError::NativeIoError {
531                            path: native_path_absolute.into(),
532                            source: err,
533                        });
534                    }
535
536                    lock = me.lock();
537                }
538            }
539        }
540
541        lock.data.assets.push(Asset::new(
542            uuid,
543            source_from_root.into_owned().into(),
544            source_format.into(),
545            native_format.into(),
546            tags.iter().map(|tag| tag.as_ref().into()).collect(),
547        ));
548
549        tracing::info!("Asset '{}' registered", uuid);
550        drop(lock);
551        let _ = Self::save(me);
552
553        Ok(uuid)
554    }
555
556    pub(crate) fn fetch(
557        me: &Mutex<Self>,
558        uuid: &Uuid,
559        next_version: u64,
560    ) -> Result<Option<FetchInfo>, FetchError> {
561        let lock = me.lock();
562
563        match lock.data.assets.iter().position(|a| a.uuid() == *uuid) {
564            None => Err(FetchError::NotFound),
565            #[cfg(not(feature = "import"))]
566            Some(_) => {
567                let native_path = Path::new(".treasury").join(uuid.to_hyphenated().to_string());
568                let native_path_absolute = lock.root.join(&native_path);
569                let native_file = std::fs::File::open(&native_path_absolute).map_err(|source| {
570                    FetchError::NativeIoError {
571                        source,
572                        path: native_path_absolute.clone().into(),
573                    }
574                })?;
575
576                let native_modified =
577                    native_file
578                        .metadata()
579                        .and_then(|m| m.modified())
580                        .map_err(|source| FetchError::NativeIoError {
581                            source,
582                            path: native_path_absolute.clone().into(),
583                        })?;
584
585                let version = version_from_systime(native_modified);
586                if next_version > version {
587                    tracing::trace!("Native asset is not updated");
588                    return Ok(None);
589                }
590
591                let native_path = native_path_absolute.into();
592
593                Ok(Some(FetchInfo {
594                    native_path,
595                    native_file,
596                    version,
597                }))
598            }
599            #[cfg(feature = "import")]
600            Some(index) => {
601                let native_path = Path::new(".treasury").join(uuid.to_hyphenated().to_string());
602                let native_path_absolute = lock.root.join(&native_path);
603                let mut native_file =
604                    std::fs::File::open(&native_path_absolute).map_err(|source| {
605                        FetchError::NativeIoError {
606                            source,
607                            path: native_path_absolute.clone().into(),
608                        }
609                    })?;
610
611                let native_modified =
612                    native_file
613                        .metadata()
614                        .and_then(|m| m.modified())
615                        .map_err(|source| FetchError::NativeIoError {
616                            source,
617                            path: native_path_absolute.clone().into(),
618                        })?;
619
620                let asset = &lock.data.assets[index];
621                let source_absolute = lock.root.join(asset.source());
622
623                if let Ok(source_modified) =
624                    std::fs::metadata(source_absolute).and_then(|m| m.modified())
625                {
626                    if native_modified < source_modified {
627                        tracing::trace!("Native asset file is out-of-date. Perform reimport");
628
629                        if asset.source_format() == asset.native_format() {
630                            let source_path = lock.root.join(asset.source());
631                            std::fs::copy(&source_path, &native_path_absolute).map_err(
632                                |source| FetchError::NativeIoError {
633                                    source,
634                                    path: native_path_absolute.clone().into(),
635                                },
636                            )?;
637                        } else {
638                            match lock
639                                .importers
640                                .get_importer(asset.source_format(), asset.native_format())
641                            {
642                                None => {
643                                    tracing::warn!(
644                                        "Importer from '{}' to '{}' not found, asset '{}@{}' cannot be updated",
645                                        asset.source_format(),
646                                        asset.native_format(),
647                                        asset.uuid(),
648                                        asset.source().display(),
649                                    );
650                                }
651                                Some(importer) => {
652                                    let native_tmp_path = native_path.with_extension("tmp");
653                                    let native_tmp_absolute_path =
654                                        native_path_absolute.with_extension("tmp");
655                                    let source_path = lock.root.join(asset.source());
656
657                                    let result =
658                                        importer.import(&source_path, &native_tmp_path, lock);
659
660                                    match result {
661                                        Ok(()) => {
662                                            drop(native_file);
663                                            match std::fs::rename(
664                                                &native_tmp_absolute_path,
665                                                &native_path_absolute,
666                                            ) {
667                                                Ok(()) => {
668                                                    tracing::trace!("Native file updated");
669                                                }
670                                                Err(err) => {
671                                                    tracing::warn!(
672                                                            "Failed to copy native file '{}' from '{}'. {:#}",
673                                                            native_path.display(),
674                                                            native_tmp_path.display(),
675                                                            err
676                                                        )
677                                                }
678                                            }
679                                            match std::fs::File::open(&native_path_absolute) {
680                                                Ok(file) => native_file = file,
681                                                Err(err) => {
682                                                    tracing::warn!(
683                                                        "Failed to reopen native file '{}'. {:#}",
684                                                        native_path_absolute.display(),
685                                                        err,
686                                                    );
687                                                    return Err(FetchError::NativeIoError {
688                                                        source: err,
689                                                        path: native_path_absolute
690                                                            .to_path_buf()
691                                                            .into(),
692                                                    });
693                                                }
694                                            }
695                                        }
696                                        Err(err) => {
697                                            tracing::warn!(
698                                                "Native file reimport failed '{:#}'. Fallback to old file",
699                                                err,
700                                            );
701                                        }
702                                    }
703                                }
704                            }
705                        }
706                    } else {
707                        tracing::trace!("Native asset file is up-to-date");
708                    }
709                } else {
710                    tracing::warn!("Failed to determine if native file is up-to-date");
711                }
712
713                let version = version_from_systime(native_modified);
714                if next_version > version {
715                    tracing::trace!("Native asset is not updated");
716                    return Ok(None);
717                }
718
719                Ok(Some(FetchInfo {
720                    native_path: native_path_absolute.into(),
721                    native_file,
722                    version,
723                }))
724            }
725        }
726    }
727}
728
729#[cfg(feature = "import")]
730fn relative_to<'a>(path: &'a Path, root: &Path) -> std::borrow::Cow<'a, Path> {
731    use std::path::PathBuf;
732
733    debug_assert!(
734        path.is_absolute(),
735        "Path '{}' is not absolute",
736        path.display()
737    );
738    debug_assert!(
739        root.is_absolute(),
740        "Root path '{}' is not absolute",
741        root.display()
742    );
743
744    let mut pcs = path.components();
745    let mut rcs = root.components();
746
747    let prefix_length = pcs
748        .by_ref()
749        .zip(&mut rcs)
750        .take_while(|(pc, rc)| pc == rc)
751        .count();
752
753    if prefix_length == 0 {
754        path.into()
755    } else {
756        let mut pcs = path.components();
757        pcs.nth(prefix_length - 1);
758
759        let mut rcs = root.components();
760        rcs.nth(prefix_length - 1);
761
762        let up = (0..rcs.count()).fold(PathBuf::new(), |mut acc, _| {
763            acc.push("..");
764            acc
765        });
766
767        up.join(pcs.as_path()).into()
768    }
769}
770
771fn version_from_systime(systime: SystemTime) -> u64 {
772    systime
773        .duration_since(SystemTime::UNIX_EPOCH)
774        .unwrap()
775        .as_millis() as u64
776}