1use anyhow::Context;
2use futures::TryStreamExt;
3use near_chain_configs::ExternalStorageLocation;
4use object_store::{ObjectStore, PutPayload};
5use std::io::{Read, Write};
6use std::path::PathBuf;
7use std::sync::Arc;
8use std::time::Duration;
9
10#[derive(Clone)]
14pub enum ExternalConnection {
15 S3 { bucket: Arc<s3::Bucket> },
17 Filesystem { root_dir: PathBuf },
19 GCS {
21 gcs_client: Arc<object_store::gcp::GoogleCloudStorage>,
24 reqwest_client: Arc<reqwest::Client>,
26 bucket: String,
27 },
28}
29
30const GCS_ENCODE_SET: &percent_encoding::AsciiSet =
32 &percent_encoding::NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_');
33
34pub struct S3AccessConfig {
36 pub is_readonly: bool,
37 pub timeout: Duration,
38}
39
40impl ExternalConnection {
41 pub fn name(&self) -> &'static str {
43 match self {
44 Self::S3 { .. } => "S3",
45 Self::Filesystem { .. } => "Filesystem",
46 Self::GCS { .. } => "GCS",
47 }
48 }
49
50 pub fn new(
54 location: &ExternalStorageLocation,
55 credentials_file: Option<PathBuf>,
56 s3_access_config: Option<S3AccessConfig>,
57 ) -> Self {
58 match location {
59 ExternalStorageLocation::S3 { bucket, region, .. } => {
60 let S3AccessConfig { is_readonly, timeout } = s3_access_config
61 .expect("S3 access config not provided with S3 external storage location");
62 let bucket = if is_readonly {
63 create_s3_bucket_readonly(&bucket, ®ion, timeout)
64 } else {
65 create_s3_bucket_read_write(&bucket, ®ion, timeout, credentials_file)
66 };
67 if let Err(err) = bucket {
68 if is_readonly {
69 panic!("Failed to create an S3 bucket: {err}");
70 } else {
71 panic!(
72 "Failed to authenticate connection to S3. Please either provide AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the environment, or create a credentials file and link it in config.json as 's3_credentials_file'. Error: {err}"
73 );
74 }
75 }
76 ExternalConnection::S3 { bucket: Arc::new(bucket.unwrap()) }
77 }
78 ExternalStorageLocation::Filesystem { root_dir } => {
79 ExternalConnection::Filesystem { root_dir: root_dir.clone() }
80 }
81 ExternalStorageLocation::GCS { bucket, .. } => {
82 if let Some(credentials_file) = credentials_file {
83 if let Ok(var) = std::env::var("SERVICE_ACCOUNT") {
84 tracing::warn!(target: "external", %var, ?credentials_file, "environment variable `SERVICE_ACCOUNT` is set, but `credentials_file` in config.json overrides it");
85 println!(
86 "Environment variable 'SERVICE_ACCOUNT' is set to {var}, but 'credentials_file' in config.json overrides it to '{credentials_file:?}'"
87 );
88 }
89 unsafe { std::env::set_var("SERVICE_ACCOUNT", &credentials_file) };
91 tracing::info!(target: "external", ?credentials_file, "set the environment variable `SERVICE_ACCOUNT`");
92 }
93 ExternalConnection::GCS {
94 gcs_client: Arc::new(
95 object_store::gcp::GoogleCloudStorageBuilder::from_env()
96 .with_bucket_name(bucket)
97 .build()
98 .unwrap(),
99 ),
100 reqwest_client: Arc::new(reqwest::Client::default()),
101 bucket: bucket.clone(),
102 }
103 }
104 }
105 }
106
107 pub async fn get(&self, path: &str) -> Result<Vec<u8>, anyhow::Error> {
109 match self {
110 ExternalConnection::S3 { bucket } => {
111 tracing::debug!(target: "external", path, "reading from S3");
112 let response = bucket.get_object(path).await?;
113 if response.status_code() == 200 {
114 Ok(response.bytes().to_vec())
115 } else {
116 Err(anyhow::anyhow!("Bad response status code: {}", response.status_code()))
117 }
118 }
119 ExternalConnection::Filesystem { root_dir } => {
120 let path = root_dir.join(path);
121 tracing::debug!(target: "external", ?path, "reading a file");
122 let data = std::fs::read(&path)?;
123 Ok(data)
124 }
125 ExternalConnection::GCS { reqwest_client, bucket, .. } => {
126 let url = format!(
129 "https://storage.googleapis.com/storage/v1/b/{}/o/{}?alt=media",
130 percent_encoding::percent_encode(bucket.as_bytes(), GCS_ENCODE_SET),
131 percent_encoding::percent_encode(path.as_bytes(), GCS_ENCODE_SET),
132 );
133 tracing::debug!(target: "external", url, "reading from GCS");
134 let response = reqwest_client.get(&url).send().await?.error_for_status();
135 match response {
136 Err(e) => Err(e.into()),
137 Ok(r) => {
138 let bytes = r.bytes().await?.to_vec();
139 Ok(bytes)
140 }
141 }
142 }
143 }
144 }
145
146 pub async fn put(&self, path: &str, value: &[u8]) -> Result<(), anyhow::Error> {
148 match self {
149 ExternalConnection::S3 { bucket } => {
150 tracing::debug!(target: "external", path, "writing to S3");
151 bucket.put_object(path, value).await?;
152 Ok(())
153 }
154 ExternalConnection::Filesystem { root_dir } => {
155 let path = root_dir.join(path);
156 tracing::debug!(target: "external", ?path, "writing to a file");
157 if let Some(parent_dir) = path.parent() {
158 std::fs::create_dir_all(parent_dir)?;
159 }
160 let mut file = std::fs::OpenOptions::new()
161 .write(true)
162 .create(true)
163 .truncate(true)
164 .open(&path)?;
165 file.write_all(value)?;
166 Ok(())
167 }
168 ExternalConnection::GCS { gcs_client, .. } => {
169 let path = object_store::path::Path::parse(path)
170 .with_context(|| format!("{path} isn't a valid path for GCP"))?;
171 tracing::debug!(target: "external", ?path, "writing to GCS");
172 gcs_client.put(&path, PutPayload::from_bytes(value.to_vec().into())).await?;
173 Ok(())
174 }
175 }
176 }
177
178 pub async fn list(&self, directory_path: &str) -> Result<Vec<String>, anyhow::Error> {
183 match self {
184 ExternalConnection::S3 { bucket } => {
185 let prefix = format!("{}/", directory_path);
186 let list_results = bucket.list(prefix.clone(), Some("/".to_string())).await?;
187 tracing::debug!(target: "external", directory_path, "list directory in S3");
188 let mut file_names = vec![];
189 for res in list_results {
190 for obj in res.contents {
191 file_names.push(extract_file_name_from_full_path(obj.key))
192 }
193 }
194 Ok(file_names)
195 }
196 ExternalConnection::Filesystem { root_dir } => {
197 let path = root_dir.join(directory_path);
198 tracing::debug!(target: "external", ?path, "list files in local directory");
199 std::fs::create_dir_all(&path)?;
200 let mut file_names = vec![];
201 let files = std::fs::read_dir(&path)?;
202 for file in files {
203 let file_name = extract_file_name_from_path_buf(file?.path());
204 file_names.push(file_name);
205 }
206 Ok(file_names)
207 }
208 ExternalConnection::GCS { gcs_client, .. } => {
209 let prefix = format!("{}/", directory_path);
210 tracing::debug!(target: "external", directory_path, "list directory in GCS");
211 Ok(gcs_client
212 .list(Some(
213 &object_store::path::Path::parse(&prefix)
214 .with_context(|| format!("can't parse {prefix} as path"))?,
215 ))
216 .try_collect::<Vec<_>>()
217 .await?
218 .into_iter()
219 .map(|object| object.location.filename().unwrap().into())
220 .collect())
221 }
222 }
223 }
224}
225
226fn extract_file_name_from_full_path(full_path: String) -> String {
228 return extract_file_name_from_path_buf(PathBuf::from(full_path));
229}
230
231fn extract_file_name_from_path_buf(path_buf: PathBuf) -> String {
233 return path_buf.file_name().unwrap().to_str().unwrap().to_string();
234}
235
236pub fn create_s3_bucket_readonly(
238 bucket: &str,
239 region: &str,
240 timeout: Duration,
241) -> Result<s3::Bucket, anyhow::Error> {
242 let creds = s3::creds::Credentials::anonymous()?;
243 create_s3_bucket(bucket, region, timeout, creds)
244}
245
246#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
248struct S3CredentialsConfig {
249 access_key: String,
250 secret_key: String,
251}
252
253pub fn create_s3_bucket_read_write(
255 bucket: &str,
256 region: &str,
257 timeout: Duration,
258 credentials_file: Option<PathBuf>,
259) -> Result<s3::Bucket, anyhow::Error> {
260 let creds = match credentials_file {
261 Some(credentials_file) => {
262 let mut file = std::fs::File::open(credentials_file)?;
263 let mut json_config_str = String::new();
264 file.read_to_string(&mut json_config_str)?;
265 let credentials_config: S3CredentialsConfig = serde_json::from_str(&json_config_str)?;
266 s3::creds::Credentials::new(
267 Some(&credentials_config.access_key),
268 Some(&credentials_config.secret_key),
269 None,
270 None,
271 None,
272 )
273 }
274 None => s3::creds::Credentials::default(),
275 }?;
276 create_s3_bucket(bucket, region, timeout, creds)
277}
278
279fn create_s3_bucket(
281 bucket: &str,
282 region: &str,
283 timeout: Duration,
284 creds: s3::creds::Credentials,
285) -> Result<s3::Bucket, anyhow::Error> {
286 let mut bucket = s3::Bucket::new(bucket, region.parse::<s3::Region>()?, creds)?;
287 bucket.set_request_timeout(Some(timeout));
289 Ok(bucket)
290}