Skip to main content

sk_core/
external_storage.rs

1/// We use the [object_store](https://docs.rs/object_store/latest/object_store/index.html) crate to
2/// enable reading/writing from the three major cloud providers (AWS, Azure, GCP), as well as
3/// to/from a local filesystem or an in-memory store.  Supposedly HTTP with WebDAV is supported as
4/// well but that is completely untested.
5///
6/// The reader will load credentials from the environment to communicate with the cloud provider,
7/// as follows (other auth mechanisms _may_ work as well but are currently untested):
8///
9/// ### AWS
10///
11/// Set the `AWS_ACCESS_KEY_ID` and `AWS_SECRET_ACCESS_KEY` environment variables, and pass in a
12/// URL like `s3://bucket/path/to/resource`.
13///
14/// ### Azure
15///
16/// Set the `AZURE_STORAGE_ACCOUNT_NAME` and `AZURE_STORAGE_ACCOUNT_KEY` environment variables, and
17/// pass in a URL like `azure://container/path/to/resources` (do not include the storage acocunt
18/// name in the URL).
19///
20/// ### GCP
21///
22/// Set the `GOOGLE_SERVICE_ACCOUNT` environment variable to the path for your service account JSON
23/// file (if you're running inside a container, you'll need that file injected as well).  Pass in a
24/// URL like `gs://bucket/path/to/resource`.
25use std::path::{
26    PathBuf,
27    absolute,
28};
29
30use anyhow::anyhow;
31use async_trait::async_trait;
32use bytes::Bytes;
33#[cfg(feature = "mock")]
34use mockall::automock;
35use object_store::path::Path;
36use object_store::{
37    DynObjectStore,
38    ObjectStoreScheme,
39    PutPayload,
40};
41use reqwest::Url;
42
43use crate::errors::*;
44
45#[cfg_attr(feature = "mock", automock)]
46#[async_trait]
47pub trait ObjectStoreWrapper {
48    fn scheme(&self) -> ObjectStoreScheme;
49    async fn put(&self, data: Bytes) -> EmptyResult;
50    async fn get(&self) -> anyhow::Result<Bytes>;
51}
52
53#[derive(Debug)]
54pub struct SkObjectStore {
55    scheme: ObjectStoreScheme,
56    store: Box<DynObjectStore>,
57    path: Path,
58}
59
60impl SkObjectStore {
61    pub fn new(path_str: &str) -> anyhow::Result<SkObjectStore> {
62        let (scheme, path) = parse_path(path_str)?;
63        let store: Box<DynObjectStore> = match scheme {
64            ObjectStoreScheme::Local => Box::new(object_store::local::LocalFileSystem::new()),
65            ObjectStoreScheme::Memory => Box::new(object_store::memory::InMemory::new()),
66            ObjectStoreScheme::AmazonS3 => {
67                Box::new(object_store::aws::AmazonS3Builder::from_env().with_url(path_str).build()?)
68            },
69            ObjectStoreScheme::MicrosoftAzure => Box::new(
70                object_store::azure::MicrosoftAzureBuilder::from_env()
71                    .with_url(path_str)
72                    .build()?,
73            ),
74            ObjectStoreScheme::GoogleCloudStorage => Box::new(
75                object_store::gcp::GoogleCloudStorageBuilder::from_env()
76                    .with_url(path_str)
77                    .build()?,
78            ),
79            ObjectStoreScheme::Http => Box::new(object_store::http::HttpBuilder::new().with_url(path_str).build()?),
80            _ => unimplemented!(),
81        };
82
83        Ok(SkObjectStore { scheme, store, path })
84    }
85}
86
87#[async_trait]
88impl ObjectStoreWrapper for SkObjectStore {
89    fn scheme(&self) -> ObjectStoreScheme {
90        self.scheme.clone()
91    }
92
93    async fn put(&self, data: Bytes) -> EmptyResult {
94        let payload = PutPayload::from_bytes(data);
95        self.store.put(&self.path, payload).await?;
96        Ok(())
97    }
98
99    async fn get(&self) -> anyhow::Result<Bytes> {
100        Ok(self.store.get(&self.path).await?.bytes().await?)
101    }
102}
103
104fn parse_path(path_str: &str) -> anyhow::Result<(ObjectStoreScheme, Path)> {
105    let url = match Url::parse(path_str) {
106        Err(url::ParseError::RelativeUrlWithoutBase) => {
107            let path = absolute_strip_dots(path_str)?;
108            Url::from_file_path(path).map_err(|e| anyhow!("could not create URL from file path: {e:?}"))?
109        },
110        res => res?,
111    };
112
113    Ok(ObjectStoreScheme::parse(&url)?)
114}
115
116fn absolute_strip_dots(path_str: &str) -> anyhow::Result<PathBuf> {
117    // We have to use `absolute` here in the event that the path doesn't exist locally,
118    // e.g., we're specifying a path inside the driver container.  `Path::canonicalize`
119    // requires the path to exist locally.  Unfortunately, `absolute` does not strip `..`,
120    // and ObjectStoreScheme::parse will not parse `..`, so we have to do that ourselves.
121    let orig_path = absolute(PathBuf::from(path_str))?;
122    let mut new_path = PathBuf::new();
123
124    for component in orig_path.iter() {
125        if component == ".." {
126            if !new_path.pop() {
127                bail!("malformed relative path: {path_str}");
128            }
129        } else {
130            new_path.push(component);
131        }
132    }
133
134    Ok(new_path)
135}
136
137#[cfg(test)]
138#[cfg_attr(coverage, coverage(off))]
139mod test {
140    use assertables::*;
141    use sk_testutils::*;
142
143    use super::*;
144
145    #[rstest]
146    fn test_new_sk_object_store_invalid() {
147        let _ = SkObjectStore::new("oracle3://foo/bar").unwrap_err();
148    }
149
150    #[rstest]
151    fn test_new_sk_object_store() {
152        let store = SkObjectStore::new("s3://foo/bar").unwrap();
153        assert_eq!(store.scheme(), ObjectStoreScheme::AmazonS3);
154    }
155
156    #[rstest]
157    #[case::with_base("file:///tmp/foo")]
158    #[case::without_base("/tmp/foo")]
159    #[case::absolute_with_dots("/tmp/../foo/bar/../../baz")]
160    #[case::relative("foo")]
161    #[case::relative_path_with_dots("../foo")]
162    fn test_new_sk_object_store_local_path(#[case] path: &str) {
163        let store = SkObjectStore::new(path).unwrap();
164        assert_eq!(store.scheme(), ObjectStoreScheme::Local);
165    }
166
167    #[rstest]
168    fn test_new_sk_object_store_invalid_path() {
169        let res = SkObjectStore::new("/foo/../..");
170        assert_err!(res);
171    }
172}