treasury_store/
lib.rs

1use std::{
2    collections::VecDeque,
3    path::{Path, PathBuf},
4    time::{Duration, SystemTime},
5};
6
7use eyre::WrapErr;
8use hashbrown::{HashMap, HashSet};
9use importer::Importers;
10use meta::{AssetMeta, SourceMeta};
11use parking_lot::RwLock;
12use sources::Sources;
13use temp::Temporaries;
14use treasury_id::{AssetId, AssetIdContext};
15use treasury_import::ImportError;
16use url::Url;
17
18mod importer;
19mod meta;
20mod scheme;
21mod sha256;
22mod sources;
23mod temp;
24
25pub const TREASURY_META_NAME: &'static str = "Treasury.toml";
26
27const DEFAULT_AUX: &'static str = "treasury";
28const DEFAULT_ARTIFACTS: &'static str = "artifacts";
29const DEFAULT_EXTERNAL: &'static str = "external";
30const MAX_ITEM_ATTEMPTS: u32 = 1024;
31
32#[derive(serde::Serialize, serde::Deserialize)]
33pub struct TreasuryInfo {
34    #[serde(skip_serializing_if = "Option::is_none", default)]
35    pub artifacts: Option<PathBuf>,
36    #[serde(skip_serializing_if = "Option::is_none", default)]
37    pub external: Option<PathBuf>,
38    #[serde(skip_serializing_if = "Option::is_none", default)]
39    pub temp: Option<PathBuf>,
40    #[serde(skip_serializing_if = "Vec::is_empty", default)]
41    pub importers: Vec<PathBuf>,
42}
43
44impl Default for TreasuryInfo {
45    fn default() -> Self {
46        TreasuryInfo::new(None, None, None, &[])
47    }
48}
49
50impl TreasuryInfo {
51    pub fn write(&self, path: &Path) -> eyre::Result<()> {
52        let meta = toml::to_string_pretty(self).wrap_err("Failed to serialize metadata")?;
53        std::fs::write(path, &meta)
54            .wrap_err_with(|| format!("Failed to write metadata file '{}'", path.display()))?;
55        Ok(())
56    }
57
58    pub fn read(path: &Path) -> eyre::Result<Self> {
59        let err_ctx = || format!("Failed to read metadata file '{}'", path.display());
60
61        let meta = std::fs::read(path).wrap_err_with(err_ctx)?;
62        let meta: TreasuryInfo = toml::from_slice(&meta).wrap_err_with(err_ctx)?;
63        Ok(meta)
64    }
65
66    pub fn new(
67        artifacts: Option<&Path>,
68        external: Option<&Path>,
69        temp: Option<&Path>,
70        importers: &[&Path],
71    ) -> Self {
72        let artifacts = artifacts.map(Path::to_owned);
73        let external = external.map(Path::to_owned);
74        let temp = temp.map(Path::to_owned);
75        let importers = importers.iter().copied().map(|p| p.to_owned()).collect();
76
77        TreasuryInfo {
78            artifacts,
79            external,
80            temp,
81            importers,
82        }
83    }
84}
85
86#[derive(Clone)]
87struct AssetItem {
88    source: Url,
89    format: Option<String>,
90    target: String,
91}
92
93pub struct Treasury {
94    id_ctx: AssetIdContext,
95    base: PathBuf,
96    base_url: Url,
97    artifacts_base: PathBuf,
98    external: PathBuf,
99    temp: PathBuf,
100    importers: Importers,
101
102    artifacts: RwLock<HashMap<AssetId, AssetItem>>,
103    scanned: RwLock<bool>,
104}
105
106impl Treasury {
107    /// Find and open treasury in ancestors of specified directory.
108    #[tracing::instrument]
109    pub fn find_from(path: &Path) -> eyre::Result<Self> {
110        let meta_path = find_treasury_info(path).ok_or_else(|| {
111            eyre::eyre!(
112                "Failed to find `Treasury.toml` in ancestors of {}",
113                path.display(),
114            )
115        })?;
116
117        Treasury::open(&meta_path)
118    }
119
120    /// Find and open treasury in ancestors of current directory.
121    #[tracing::instrument]
122    pub fn find() -> eyre::Result<Self> {
123        let cwd = std::env::current_dir().wrap_err("Failed to get current directory")?;
124        Treasury::find_from(&cwd)
125    }
126
127    /// Open treasury database at specified path.
128    #[tracing::instrument]
129    pub fn open(path: &Path) -> eyre::Result<Self> {
130        let meta = TreasuryInfo::read(path)?;
131        let base = path.parent().unwrap().to_owned();
132
133        Self::new(&base, meta)
134    }
135
136    pub fn new(base: &Path, meta: TreasuryInfo) -> eyre::Result<Self> {
137        let base = dunce::canonicalize(base).wrap_err_with(|| {
138            eyre::eyre!("Failed to canonicalize base path '{}'", base.display())
139        })?;
140        let base_url = Url::from_directory_path(&base)
141            .map_err(|()| eyre::eyre!("'{}' is invalid base path", base.display()))?;
142
143        let artifacts = base.join(
144            meta.artifacts
145                .unwrap_or_else(|| Path::new(DEFAULT_AUX).join(DEFAULT_ARTIFACTS)),
146        );
147
148        let external = base.join(
149            meta.external
150                .unwrap_or_else(|| Path::new(DEFAULT_AUX).join(DEFAULT_EXTERNAL)),
151        );
152
153        let temp = meta
154            .temp
155            .map_or_else(std::env::temp_dir, |path| base.join(path));
156
157        let id_ctx = AssetIdContext::new(treasury_epoch(), rand::random());
158
159        let mut importers = Importers::new();
160
161        for lib_path in &meta.importers {
162            let lib_path = base.join(lib_path);
163
164            unsafe {
165                // # Safety: Nope.
166                // There is no way to make this safe.
167                // But it is unlikely to cause problems by accident.
168                if let Err(err) = importers.load_dylib_importers(&lib_path) {
169                    tracing::error!(
170                        "Failed to load importers from '{}'. {:#}",
171                        lib_path.display(),
172                        err
173                    );
174                }
175            }
176        }
177
178        Ok(Treasury {
179            id_ctx,
180            base,
181            base_url,
182            artifacts_base: artifacts,
183            external,
184            temp,
185            importers,
186            artifacts: RwLock::new(HashMap::new()),
187            scanned: RwLock::new(false),
188        })
189    }
190
191    /// Loads importers from dylib.
192    /// There is no possible way to guarantee that dylib does not break safety contracts.
193    /// Some measures to ensure safety are taken.
194    /// Providing dylib from which importers will be successfully loaded and then cause an UB should possible only on purpose.
195    #[tracing::instrument(skip(self))]
196    pub unsafe fn register_importers_lib(&mut self, lib_path: &Path) -> eyre::Result<()> {
197        self.importers.load_dylib_importers(lib_path)
198    }
199
200    /// Import an asset.
201    #[tracing::instrument(skip(self))]
202    pub async fn store(
203        &self,
204        source: &str,
205        format: Option<&str>,
206        target: &str,
207    ) -> eyre::Result<(AssetId, PathBuf)> {
208        let source = self.base_url.join(source).wrap_err_with(|| {
209            format!(
210                "Failed to construct URL from base '{}' and source '{}'",
211                self.base_url, source
212            )
213        })?;
214
215        self.store_url(source, format, target).await
216    }
217
218    /// Import an asset.
219    #[tracing::instrument(skip(self))]
220    pub async fn store_url(
221        &self,
222        source: Url,
223        format: Option<&str>,
224        target: &str,
225    ) -> eyre::Result<(AssetId, PathBuf)> {
226        let mut temporaries = Temporaries::new(&self.temp);
227        let mut sources = Sources::new();
228
229        let base = &self.base;
230        let artifacts = &self.artifacts_base;
231        let external = &self.external;
232        let importers = &self.importers;
233
234        struct StackItem {
235            /// Source URL.
236            source: Url,
237
238            /// Source format name.
239            format: Option<String>,
240
241            /// Target format name.
242            target: String,
243
244            /// Attempt counter to break infinite loops.
245            attempt: u32,
246
247            /// Sources requested by importer.
248            /// Relative to `source`.
249            sources: HashMap<Url, SystemTime>,
250
251            /// Dependencies requested by importer.
252            dependencies: HashSet<AssetId>,
253        }
254
255        let mut stack = Vec::new();
256        stack.push(StackItem {
257            source,
258            format: format.map(str::to_owned),
259            target: target.to_owned(),
260            attempt: 0,
261            sources: HashMap::new(),
262            dependencies: HashSet::new(),
263        });
264
265        loop {
266            // tokio::time::sleep(Duration::from_secs(1)).await;
267
268            let item = stack.last_mut().unwrap();
269            item.attempt += 1;
270
271            let mut meta = SourceMeta::new(&item.source, &self.base, &self.external)
272                .wrap_err("Failed to fetch source meta")?;
273
274            if let Some(asset) = meta.get_asset(&item.target) {
275                if asset.needs_reimport(&self.base_url) {
276                    tracing::debug!(
277                        "'{}' '{:?}' '{}' reimporting",
278                        item.source,
279                        item.format,
280                        item.target
281                    );
282                } else {
283                    match &item.format {
284                        None => tracing::debug!("{} @ '{}'", item.target, item.source),
285                        Some(format) => {
286                            tracing::debug!("{} as {} @ '{}'", item.target, format, item.source)
287                        }
288                    }
289
290                    stack.pop().unwrap();
291                    if stack.is_empty() {
292                        return Ok((asset.id(), asset.artifact_path(&self.artifacts_base)));
293                    }
294                    continue;
295                }
296            }
297
298            let importer = match &item.format {
299                None => importers.guess(url_ext(&item.source), &item.target)?,
300                Some(format) => importers.get(format, &item.target),
301            };
302
303            let importer = importer.ok_or_else(|| {
304                eyre::eyre!(
305                    "Failed to find importer '{} -> {}' for asset '{}'",
306                    item.format.as_deref().unwrap_or("<undefined>"),
307                    item.target,
308                    item.source,
309                )
310            })?;
311
312            // Fetch source file.
313            let (source_path, modified) = sources.fetch(&mut temporaries, &item.source).await?;
314            let source_path = source_path.to_owned();
315
316            let output_path = temporaries.make_temporary();
317
318            let result = importer.import(
319                &source_path,
320                &output_path,
321                |src: &str| {
322                    let src = item.source.join(src).ok()?; // If parsing fails - source will be listed in `ImportResult::RequireSources`.
323                    let (path, modified) = sources.get(&src)?;
324                    if let Some(modified) = modified {
325                        item.sources.insert(src, modified);
326                    }
327                    Some(path)
328                },
329                |src: &str, target: &str| {
330                    let src = item.source.join(src).ok()?;
331
332                    match SourceMeta::new(&src, base, external) {
333                        Ok(meta) => {
334                            let asset = meta.get_asset(target)?;
335                            item.dependencies.insert(asset.id());
336                            Some(asset.id())
337                        }
338                        Err(err) => {
339                            tracing::error!("Fetching dependency failed. {:#}", err);
340                            None
341                        }
342                    }
343                },
344            );
345
346            match result {
347                Ok(()) => {}
348                Err(ImportError::Other { reason }) => {
349                    return Err(eyre::eyre!(
350                        "Failed to import {}:{}->{}. {}",
351                        item.source,
352                        importer.format(),
353                        item.target,
354                        reason,
355                    ))
356                }
357                Err(ImportError::RequireSources { sources: srcs }) => {
358                    if item.attempt >= MAX_ITEM_ATTEMPTS {
359                        return Err(eyre::eyre!(
360                            "Failed to import {}:{}->{}. Too many attempts",
361                            item.source,
362                            importer.format(),
363                            item.target,
364                        ));
365                    }
366
367                    let source = item.source.clone();
368                    for src in srcs {
369                        match source.join(&src) {
370                            Err(err) => {
371                                return Err(eyre::eyre!(
372                                    "Failed to join URL '{}' with '{}'. {:#}",
373                                    source,
374                                    src,
375                                    err,
376                                ))
377                            }
378                            Ok(url) => sources.fetch(&mut temporaries, &url).await?,
379                        };
380                    }
381                    continue;
382                }
383                Err(ImportError::RequireDependencies { dependencies }) => {
384                    if item.attempt >= MAX_ITEM_ATTEMPTS {
385                        return Err(eyre::eyre!(
386                            "Failed to import {}:{}->{}. Too many attempts",
387                            item.source,
388                            importer.format(),
389                            item.target,
390                        ));
391                    }
392
393                    let source = item.source.clone();
394                    for dep in dependencies.into_iter() {
395                        match source.join(&dep.source) {
396                            Err(err) => {
397                                return Err(eyre::eyre!(
398                                    "Failed to join URL '{}' with '{}'. {:#}",
399                                    source,
400                                    dep.source,
401                                    err,
402                                ))
403                            }
404                            Ok(url) => {
405                                stack.push(StackItem {
406                                    source: url,
407                                    format: None,
408                                    target: dep.target,
409                                    attempt: 0,
410                                    sources: HashMap::new(),
411                                    dependencies: HashSet::new(),
412                                });
413                            }
414                        };
415                    }
416                    continue;
417                }
418            }
419
420            if !artifacts.exists() {
421                std::fs::create_dir_all(artifacts).wrap_err_with(|| {
422                    format!(
423                        "Failed to create artifacts directory '{}'",
424                        artifacts.display()
425                    )
426                })?;
427
428                if let Err(err) = std::fs::write(artifacts.join(".gitignore"), "*") {
429                    tracing::error!(
430                        "Failed to place .gitignore into artifacts directory. {:#}",
431                        err
432                    );
433                }
434            }
435
436            let new_id = self.id_ctx.generate();
437
438            let item = stack.pop().unwrap();
439
440            let make_relative_source = |source| match self.base_url.make_relative(source) {
441                None => item.source.to_string(),
442                Some(source) => source,
443            };
444
445            let mut sources = Vec::new();
446            if let Some(modified) = modified {
447                sources.push((make_relative_source(&item.source), modified));
448            }
449            sources.extend(
450                item.sources
451                    .iter()
452                    .map(|(url, modified)| (make_relative_source(url), *modified)),
453            );
454
455            let asset = AssetMeta::new(
456                new_id,
457                item.format.clone(),
458                sources,
459                item.dependencies.into_iter().collect(),
460                &output_path,
461                artifacts,
462            )
463            .wrap_err("Failed to prepare new asset")?;
464
465            let artifact_path = asset.artifact_path(artifacts);
466
467            meta.add_asset(item.target.clone(), asset, base, external)?;
468
469            self.artifacts.write().insert(
470                new_id,
471                AssetItem {
472                    source: item.source,
473                    format: item.format,
474                    target: item.target,
475                },
476            );
477
478            if stack.is_empty() {
479                return Ok((new_id, artifact_path));
480            }
481        }
482    }
483
484    /// Fetch asset data path.
485    pub async fn fetch(&self, id: AssetId) -> Option<PathBuf> {
486        let scanned = *self.scanned.read();
487
488        if !scanned {
489            let existing_artifacts: HashSet<_> = self.artifacts.read().keys().copied().collect();
490
491            let mut new_artifacts = Vec::new();
492            let mut scanned = self.scanned.write();
493
494            if !*scanned {
495                scan_local(&self.base, &existing_artifacts, &mut new_artifacts);
496                scan_external(&self.external, &existing_artifacts, &mut new_artifacts);
497
498                let mut artifacts = self.artifacts.write();
499                for (id, item) in new_artifacts {
500                    artifacts.insert(id, item);
501                }
502
503                *scanned = true;
504
505                drop(artifacts);
506                drop(scanned);
507            }
508        }
509
510        let item = self.artifacts.read().get(&id).cloned()?;
511
512        let (_, path) = self
513            .store_url(item.source, item.format.as_deref(), &item.target)
514            .await
515            .ok()?;
516
517        Some(path)
518    }
519
520    /// Fetch asset data path.
521    pub async fn find_asset(
522        &self,
523        source: &str,
524        target: &str,
525    ) -> eyre::Result<Option<(AssetId, PathBuf)>> {
526        let source_url = self.base_url.join(source).wrap_err_with(|| {
527            format!(
528                "Failed to construct URL from base '{}' and source '{}'",
529                self.base_url, source
530            )
531        })?;
532
533        let meta = SourceMeta::new(&source_url, &self.base, &self.external)
534            .wrap_err("Failed to fetch source meta")?;
535
536        match meta.get_asset(target) {
537            None => {
538                drop(meta);
539                match self.store(source, None, target).await {
540                    Err(err) => {
541                        tracing::warn!(
542                            "Failed to store '{}' as '{}' on lookup. {:#}",
543                            source,
544                            target,
545                            err
546                        );
547                        Ok(None)
548                    }
549                    Ok(id) => Ok(Some(id)),
550                }
551            }
552            Some(asset) => Ok(Some((
553                asset.id(),
554                asset.artifact_path(&self.artifacts_base),
555            ))),
556        }
557    }
558}
559
560pub fn find_treasury_info(mut path: &Path) -> Option<PathBuf> {
561    loop {
562        let candidate = path.join(TREASURY_META_NAME);
563        if candidate.is_file() {
564            return Some(candidate);
565        }
566        path = path.parent()?;
567    }
568}
569
570fn treasury_epoch() -> SystemTime {
571    /// Starting point of treasury epoch relative to unix epoch in seconds.
572    const TREASURY_EPOCH_FROM_UNIX: u64 = 1609448400;
573
574    SystemTime::UNIX_EPOCH + Duration::from_secs(TREASURY_EPOCH_FROM_UNIX)
575}
576
577fn url_ext(url: &Url) -> Option<&str> {
578    let path = url.path();
579    let dot = path.rfind('.')?;
580    let sep = path.rfind('/')?;
581    if dot == path.len() || dot <= sep + 1 {
582        None
583    } else {
584        Some(&path[dot + 1..])
585    }
586}
587
588fn scan_external(
589    external: &Path,
590    existing_artifacts: &HashSet<AssetId>,
591    artifacts: &mut Vec<(AssetId, AssetItem)>,
592) {
593    let dir = match std::fs::read_dir(&external) {
594        Err(err) if err.kind() == std::io::ErrorKind::NotFound => {
595            tracing::info!("External directory does not exists");
596            return;
597        }
598        Err(err) => {
599            tracing::error!(
600                "Failed to scan directory '{}'. {:#}",
601                external.display(),
602                err
603            );
604            return;
605        }
606        Ok(dir) => dir,
607    };
608    for e in dir {
609        let e = match e {
610            Err(err) => {
611                tracing::error!(
612                    "Failed to read entry in directory '{}'. {:#}",
613                    external.display(),
614                    err,
615                );
616                return;
617            }
618            Ok(e) => e,
619        };
620        let name = e.file_name();
621        let path = external.join(&name);
622        let ft = match e.file_type() {
623            Err(err) => {
624                tracing::error!("Failed to check '{}'. {:#}", path.display(), err);
625                continue;
626            }
627            Ok(ft) => ft,
628        };
629        if ft.is_file() && !SourceMeta::is_local_meta_path(&path) {
630            let meta = match SourceMeta::open_external(&path) {
631                Err(err) => {
632                    tracing::error!("Failed to scan meta file '{}'. {:#}", path.display(), err);
633                    continue;
634                }
635                Ok(meta) => meta,
636            };
637
638            let source = meta.url();
639
640            for (target, asset) in meta.assets() {
641                if !existing_artifacts.contains(&asset.id()) {
642                    artifacts.push((
643                        asset.id(),
644                        AssetItem {
645                            source: source.clone(),
646                            format: asset.format().map(ToOwned::to_owned),
647                            target: target.to_owned(),
648                        },
649                    ));
650                }
651            }
652        }
653    }
654}
655
656fn scan_local(
657    base: &Path,
658    existing_artifacts: &HashSet<AssetId>,
659    artifacts: &mut Vec<(AssetId, AssetItem)>,
660) {
661    debug_assert!(base.is_absolute());
662
663    if !base.exists() {
664        tracing::info!("Local artifacts directory does not exists");
665        return;
666    }
667
668    let mut queue = VecDeque::new();
669    queue.push_back(base.to_owned());
670
671    while let Some(dir_path) = queue.pop_front() {
672        let dir = match std::fs::read_dir(&dir_path) {
673            Err(err) => {
674                tracing::error!(
675                    "Failed to scan directory '{}'. {:#}",
676                    dir_path.display(),
677                    err
678                );
679                continue;
680            }
681            Ok(dir) => dir,
682        };
683        for e in dir {
684            let e = match e {
685                Err(err) => {
686                    tracing::error!(
687                        "Failed to read entry in directory '{}'. {:#}",
688                        dir_path.display(),
689                        err,
690                    );
691                    continue;
692                }
693                Ok(e) => e,
694            };
695            let name = e.file_name();
696            let path = dir_path.join(&name);
697            let ft = match e.file_type() {
698                Err(err) => {
699                    tracing::error!("Failed to check '{}'. {:#}", path.display(), err);
700                    continue;
701                }
702                Ok(ft) => ft,
703            };
704            if ft.is_dir() {
705                queue.push_back(path);
706            } else if ft.is_file() && SourceMeta::is_local_meta_path(&path) {
707                let meta = match SourceMeta::open_local(&path) {
708                    Err(err) => {
709                        tracing::error!("Failed to scan meta file '{}'. {:#}", path.display(), err);
710                        continue;
711                    }
712                    Ok(meta) => meta,
713                };
714
715                let source = meta.url();
716                for (target, asset) in meta.assets() {
717                    if !existing_artifacts.contains(&asset.id()) {
718                        artifacts.push((
719                            asset.id(),
720                            AssetItem {
721                                source: source.clone(),
722                                format: asset.format().map(ToOwned::to_owned),
723                                target: target.to_owned(),
724                            },
725                        ));
726                    }
727                }
728            }
729        }
730    }
731}