Skip to main content

near_external_storage/
lib.rs

1use anyhow::Context;
2use futures::TryStreamExt;
3use near_chain_configs::ExternalStorageLocation;
4use object_store::{ObjectStore, PutPayload};
5use std::io::{Read, Write};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::Duration;
9
10/// Live connection/handle to an external storage backend.
11// TODO(cloud_archival) Structure it better, e.g. make it a trait. May simplify when we
12// use object_store for each backend.
13#[derive(Clone)]
14pub enum ExternalConnection {
15    /// Authenticated S3 client (read-only or read/write).
16    S3 { bucket: Arc<s3::Bucket> },
17    /// Local filesystem root directory.
18    Filesystem { root_dir: PathBuf },
19    /// GCS client (upload/list via SDK, anonymous downloads via HTTP).
20    GCS {
21        // May be used for uploading and listing state parts. Requires valid credentials
22        // to be specified through env variable.
23        gcs_client: Arc<object_store::gcp::GoogleCloudStorage>,
24        // May be used for anonymously downloading state parts.
25        reqwest_client: Arc<reqwest::Client>,
26        bucket: String,
27    },
28}
29
30/// URL encoding rules for GCS object names.
31const GCS_ENCODE_SET: &percent_encoding::AsciiSet =
32    &percent_encoding::NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_');
33
34/// Behavior/configuration for S3 connections.
35pub struct S3AccessConfig {
36    pub is_readonly: bool,
37    pub timeout: Duration,
38}
39
40impl ExternalConnection {
41    /// Human-readable backend name for logging.
42    pub fn name(&self) -> &'static str {
43        match self {
44            Self::S3 { .. } => "S3",
45            Self::Filesystem { .. } => "Filesystem",
46            Self::GCS { .. } => "GCS",
47        }
48    }
49
50    /// Create a connection for the given storage location.
51    /// For S3, `s3_access_config` is required; `credentials_file` is used only for RW.
52    /// For GCS, `credentials_file` (if provided) overrides SERVICE_ACCOUNT.
53    pub fn new(
54        location: &ExternalStorageLocation,
55        credentials_file: Option<PathBuf>,
56        s3_access_config: Option<S3AccessConfig>,
57    ) -> Self {
58        match location {
59            ExternalStorageLocation::S3 { bucket, region, .. } => {
60                let S3AccessConfig { is_readonly, timeout } = s3_access_config
61                    .expect("S3 access config not provided with S3 external storage location");
62                let bucket = if is_readonly {
63                    create_s3_bucket_readonly(&bucket, &region, timeout)
64                } else {
65                    create_s3_bucket_read_write(&bucket, &region, timeout, credentials_file)
66                };
67                if let Err(err) = bucket {
68                    if is_readonly {
69                        panic!("Failed to create an S3 bucket: {err}");
70                    } else {
71                        panic!(
72                            "Failed to authenticate connection to S3. Please either provide AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the environment, or create a credentials file and link it in config.json as 's3_credentials_file'. Error: {err}"
73                        );
74                    }
75                }
76                ExternalConnection::S3 { bucket: Arc::new(bucket.unwrap()) }
77            }
78            ExternalStorageLocation::Filesystem { root_dir } => {
79                ExternalConnection::Filesystem { root_dir: root_dir.clone() }
80            }
81            ExternalStorageLocation::GCS { bucket, .. } => {
82                if let Some(credentials_file) = credentials_file {
83                    if let Ok(var) = std::env::var("SERVICE_ACCOUNT") {
84                        tracing::warn!(target: "external", %var, ?credentials_file, "environment variable `SERVICE_ACCOUNT` is set, but `credentials_file` in config.json overrides it");
85                        println!(
86                            "Environment variable 'SERVICE_ACCOUNT' is set to {var}, but 'credentials_file' in config.json overrides it to '{credentials_file:?}'"
87                        );
88                    }
89                    // SAFE: no threads *yet*.
90                    unsafe { std::env::set_var("SERVICE_ACCOUNT", &credentials_file) };
91                    tracing::info!(target: "external", ?credentials_file, "set the environment variable `SERVICE_ACCOUNT`");
92                }
93                ExternalConnection::GCS {
94                    gcs_client: Arc::new(
95                        object_store::gcp::GoogleCloudStorageBuilder::from_env()
96                            .with_bucket_name(bucket)
97                            .build()
98                            .unwrap(),
99                    ),
100                    reqwest_client: Arc::new(reqwest::Client::default()),
101                    bucket: bucket.clone(),
102                }
103            }
104        }
105    }
106
107    /// Download an object at `path` as bytes.
108    pub async fn get(&self, path: &str) -> Result<Vec<u8>, anyhow::Error> {
109        match self {
110            ExternalConnection::S3 { bucket } => {
111                tracing::debug!(target: "external", path, "reading from S3");
112                let response = bucket.get_object(path).await?;
113                if response.status_code() == 200 {
114                    Ok(response.bytes().to_vec())
115                } else {
116                    Err(anyhow::anyhow!("Bad response status code: {}", response.status_code()))
117                }
118            }
119            ExternalConnection::Filesystem { root_dir } => {
120                let path = root_dir.join(path);
121                tracing::debug!(target: "external", ?path, "reading a file");
122                let data = std::fs::read(&path)?;
123                Ok(data)
124            }
125            ExternalConnection::GCS { reqwest_client, bucket, .. } => {
126                // Download should be handled anonymously, therefore we are not using cloud-storage crate.
127                // TODO(cloud_archival) Consider the case of cloud archival
128                let url = format!(
129                    "https://storage.googleapis.com/storage/v1/b/{}/o/{}?alt=media",
130                    percent_encoding::percent_encode(bucket.as_bytes(), GCS_ENCODE_SET),
131                    percent_encoding::percent_encode(path.as_bytes(), GCS_ENCODE_SET),
132                );
133                tracing::debug!(target: "external", url, "reading from GCS");
134                let response = reqwest_client.get(&url).send().await?.error_for_status();
135                match response {
136                    Err(e) => Err(e.into()),
137                    Ok(r) => {
138                        let bytes = r.bytes().await?.to_vec();
139                        Ok(bytes)
140                    }
141                }
142            }
143        }
144    }
145
146    /// Upload/overwrite an object at `path` with `value`.
147    pub async fn put(&self, path: &str, value: &[u8]) -> Result<(), anyhow::Error> {
148        match self {
149            ExternalConnection::S3 { bucket } => {
150                tracing::debug!(target: "external", path, "writing to S3");
151                bucket.put_object(path, value).await?;
152                Ok(())
153            }
154            ExternalConnection::Filesystem { root_dir } => {
155                let path = root_dir.join(path);
156                tracing::debug!(target: "external", ?path, "writing to a file");
157                if let Some(parent_dir) = path.parent() {
158                    std::fs::create_dir_all(parent_dir)?;
159                }
160                let mut file = std::fs::OpenOptions::new()
161                    .write(true)
162                    .create(true)
163                    .truncate(true)
164                    .open(&path)?;
165                file.write_all(value)?;
166                Ok(())
167            }
168            ExternalConnection::GCS { gcs_client, .. } => {
169                let path = object_store::path::Path::parse(path)
170                    .with_context(|| format!("{path} isn't a valid path for GCP"))?;
171                tracing::debug!(target: "external", ?path, "writing to GCS");
172                gcs_client.put(&path, PutPayload::from_bytes(value.to_vec().into())).await?;
173                Ok(())
174            }
175        }
176    }
177
178    /// List object names under the given directory.
179    ///
180    /// Non-recursive for Filesystem and S3.
181    /// Recursive for GCS (lists all objects within the given directory).
182    pub async fn list(&self, directory_path: &str) -> Result<Vec<String>, anyhow::Error> {
183        match self {
184            ExternalConnection::S3 { bucket } => {
185                let prefix = format!("{}/", directory_path);
186                let list_results = bucket.list(prefix.clone(), Some("/".to_string())).await?;
187                tracing::debug!(target: "external", directory_path, "list directory in S3");
188                let mut file_names = vec![];
189                for res in list_results {
190                    for obj in res.contents {
191                        file_names.push(extract_file_name_from_full_path(obj.key))
192                    }
193                }
194                Ok(file_names)
195            }
196            ExternalConnection::Filesystem { root_dir } => {
197                let path = root_dir.join(directory_path);
198                tracing::debug!(target: "external", ?path, "list files in local directory");
199                std::fs::create_dir_all(&path)?;
200                let mut file_names = vec![];
201                let files = std::fs::read_dir(&path)?;
202                for file in files {
203                    let file_name = extract_file_name_from_path_buf(file?.path());
204                    file_names.push(file_name);
205                }
206                Ok(file_names)
207            }
208            ExternalConnection::GCS { gcs_client, .. } => {
209                let prefix = format!("{}/", directory_path);
210                tracing::debug!(target: "external", directory_path, "list directory in GCS");
211                Ok(gcs_client
212                    .list(Some(
213                        &object_store::path::Path::parse(&prefix)
214                            .with_context(|| format!("can't parse {prefix} as path"))?,
215                    ))
216                    .try_collect::<Vec<_>>()
217                    .await?
218                    .into_iter()
219                    .map(|object| object.location.filename().unwrap().into())
220                    .collect())
221            }
222        }
223    }
224}
225
226/// Extract file name from a full (string) path.
227fn extract_file_name_from_full_path(full_path: String) -> String {
228    return extract_file_name_from_path_buf(PathBuf::from(full_path));
229}
230
231/// Extract file name from a PathBuf.
232fn extract_file_name_from_path_buf(path_buf: PathBuf) -> String {
233    return path_buf.file_name().unwrap().to_str().unwrap().to_string();
234}
235
236/// Create an anonymous, read-only S3 bucket handle.
237pub fn create_s3_bucket_readonly(
238    bucket: &str,
239    region: &str,
240    timeout: Duration,
241) -> Result<s3::Bucket, anyhow::Error> {
242    let creds = s3::creds::Credentials::anonymous()?;
243    create_s3_bucket(bucket, region, timeout, creds)
244}
245
246/// Credentials for S3 read/write access (from JSON file).
247#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
248struct S3CredentialsConfig {
249    access_key: String,
250    secret_key: String,
251}
252
253/// Create a read/write S3 bucket handle, optionally using a JSON credentials file.
254pub fn create_s3_bucket_read_write(
255    bucket: &str,
256    region: &str,
257    timeout: Duration,
258    credentials_file: Option<PathBuf>,
259) -> Result<s3::Bucket, anyhow::Error> {
260    let creds = match credentials_file {
261        Some(credentials_file) => {
262            let mut file = std::fs::File::open(credentials_file)?;
263            let mut json_config_str = String::new();
264            file.read_to_string(&mut json_config_str)?;
265            let credentials_config: S3CredentialsConfig = serde_json::from_str(&json_config_str)?;
266            s3::creds::Credentials::new(
267                Some(&credentials_config.access_key),
268                Some(&credentials_config.secret_key),
269                None,
270                None,
271                None,
272            )
273        }
274        None => s3::creds::Credentials::default(),
275    }?;
276    create_s3_bucket(bucket, region, timeout, creds)
277}
278
279/// Build an S3 bucket client and set request timeout.
280fn create_s3_bucket(
281    bucket: &str,
282    region: &str,
283    timeout: Duration,
284    creds: s3::creds::Credentials,
285) -> Result<s3::Bucket, anyhow::Error> {
286    let mut bucket = s3::Bucket::new(bucket, region.parse::<s3::Region>()?, creds)?;
287    // Ensure requests finish in finite amount of time.
288    bucket.set_request_timeout(Some(timeout));
289    Ok(bucket)
290}