use anyhow::Context;
use futures::TryStreamExt;
use near_chain_configs::ExternalStorageLocation;
use object_store::{ObjectStore, PutPayload};
use std::io::{Read, Write};
use std::path::PathBuf;
use std::sync::Arc;
use std::time::Duration;
#[derive(Clone)]
pub enum ExternalConnection {
S3 { bucket: Arc<s3::Bucket> },
Filesystem { root_dir: PathBuf },
GCS {
gcs_client: Arc<object_store::gcp::GoogleCloudStorage>,
reqwest_client: Arc<reqwest::Client>,
bucket: String,
},
}
const GCS_ENCODE_SET: &percent_encoding::AsciiSet =
&percent_encoding::NON_ALPHANUMERIC.remove(b'-').remove(b'.').remove(b'_');
pub struct S3AccessConfig {
pub is_readonly: bool,
pub timeout: Duration,
}
impl ExternalConnection {
pub fn name(&self) -> &'static str {
match self {
Self::S3 { .. } => "S3",
Self::Filesystem { .. } => "Filesystem",
Self::GCS { .. } => "GCS",
}
}
pub fn new(
location: &ExternalStorageLocation,
credentials_file: Option<PathBuf>,
s3_access_config: Option<S3AccessConfig>,
) -> Self {
match location {
ExternalStorageLocation::S3 { bucket, region, .. } => {
let S3AccessConfig { is_readonly, timeout } = s3_access_config
.expect("S3 access config not provided with S3 external storage location");
let bucket = if is_readonly {
create_s3_bucket_readonly(&bucket, ®ion, timeout)
} else {
create_s3_bucket_read_write(&bucket, ®ion, timeout, credentials_file)
};
if let Err(err) = bucket {
if is_readonly {
panic!("Failed to create an S3 bucket: {err}");
} else {
panic!(
"Failed to authenticate connection to S3. Please either provide AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY in the environment, or create a credentials file and link it in config.json as 's3_credentials_file'. Error: {err}"
);
}
}
ExternalConnection::S3 { bucket: Arc::new(bucket.unwrap()) }
}
ExternalStorageLocation::Filesystem { root_dir } => {
ExternalConnection::Filesystem { root_dir: root_dir.clone() }
}
ExternalStorageLocation::GCS { bucket, .. } => {
if let Some(credentials_file) = credentials_file {
if let Ok(var) = std::env::var("SERVICE_ACCOUNT") {
tracing::warn!(target: "external", %var, ?credentials_file, "environment variable `SERVICE_ACCOUNT` is set, but `credentials_file` in config.json overrides it");
println!(
"Environment variable 'SERVICE_ACCOUNT' is set to {var}, but 'credentials_file' in config.json overrides it to '{credentials_file:?}'"
);
}
unsafe { std::env::set_var("SERVICE_ACCOUNT", &credentials_file) };
tracing::info!(target: "external", ?credentials_file, "set the environment variable `SERVICE_ACCOUNT`");
}
ExternalConnection::GCS {
gcs_client: Arc::new(
object_store::gcp::GoogleCloudStorageBuilder::from_env()
.with_bucket_name(bucket)
.build()
.unwrap(),
),
reqwest_client: Arc::new(reqwest::Client::default()),
bucket: bucket.clone(),
}
}
}
}
pub async fn get(&self, path: &str) -> Result<Vec<u8>, anyhow::Error> {
match self {
ExternalConnection::S3 { bucket } => {
tracing::debug!(target: "external", path, "reading from S3");
let response = bucket.get_object(path).await?;
if response.status_code() == 200 {
Ok(response.bytes().to_vec())
} else {
Err(anyhow::anyhow!("Bad response status code: {}", response.status_code()))
}
}
ExternalConnection::Filesystem { root_dir } => {
let path = root_dir.join(path);
tracing::debug!(target: "external", ?path, "reading a file");
let data = std::fs::read(&path)?;
Ok(data)
}
ExternalConnection::GCS { reqwest_client, bucket, .. } => {
let url = format!(
"https://storage.googleapis.com/storage/v1/b/{}/o/{}?alt=media",
percent_encoding::percent_encode(bucket.as_bytes(), GCS_ENCODE_SET),
percent_encoding::percent_encode(path.as_bytes(), GCS_ENCODE_SET),
);
tracing::debug!(target: "external", url, "reading from GCS");
let response = reqwest_client.get(&url).send().await?.error_for_status();
match response {
Err(e) => Err(e.into()),
Ok(r) => {
let bytes = r.bytes().await?.to_vec();
Ok(bytes)
}
}
}
}
}
pub async fn put(&self, path: &str, value: &[u8]) -> Result<(), anyhow::Error> {
match self {
ExternalConnection::S3 { bucket } => {
tracing::debug!(target: "external", path, "writing to S3");
bucket.put_object(path, value).await?;
Ok(())
}
ExternalConnection::Filesystem { root_dir } => {
let path = root_dir.join(path);
tracing::debug!(target: "external", ?path, "writing to a file");
if let Some(parent_dir) = path.parent() {
std::fs::create_dir_all(parent_dir)?;
}
let mut file = std::fs::OpenOptions::new()
.write(true)
.create(true)
.truncate(true)
.open(&path)?;
file.write_all(value)?;
Ok(())
}
ExternalConnection::GCS { gcs_client, .. } => {
let path = object_store::path::Path::parse(path)
.with_context(|| format!("{path} isn't a valid path for GCP"))?;
tracing::debug!(target: "external", ?path, "writing to GCS");
gcs_client.put(&path, PutPayload::from_bytes(value.to_vec().into())).await?;
Ok(())
}
}
}
pub async fn list(&self, directory_path: &str) -> Result<Vec<String>, anyhow::Error> {
match self {
ExternalConnection::S3 { bucket } => {
let prefix = format!("{}/", directory_path);
let list_results = bucket.list(prefix.clone(), Some("/".to_string())).await?;
tracing::debug!(target: "external", directory_path, "list directory in S3");
let mut file_names = vec![];
for res in list_results {
for obj in res.contents {
file_names.push(extract_file_name_from_full_path(obj.key))
}
}
Ok(file_names)
}
ExternalConnection::Filesystem { root_dir } => {
let path = root_dir.join(directory_path);
tracing::debug!(target: "external", ?path, "list files in local directory");
std::fs::create_dir_all(&path)?;
let mut file_names = vec![];
let files = std::fs::read_dir(&path)?;
for file in files {
let file_name = extract_file_name_from_path_buf(file?.path());
file_names.push(file_name);
}
Ok(file_names)
}
ExternalConnection::GCS { gcs_client, .. } => {
let prefix = format!("{}/", directory_path);
tracing::debug!(target: "external", directory_path, "list directory in GCS");
Ok(gcs_client
.list(Some(
&object_store::path::Path::parse(&prefix)
.with_context(|| format!("can't parse {prefix} as path"))?,
))
.try_collect::<Vec<_>>()
.await?
.into_iter()
.map(|object| object.location.filename().unwrap().into())
.collect())
}
}
}
}
fn extract_file_name_from_full_path(full_path: String) -> String {
return extract_file_name_from_path_buf(PathBuf::from(full_path));
}
fn extract_file_name_from_path_buf(path_buf: PathBuf) -> String {
return path_buf.file_name().unwrap().to_str().unwrap().to_string();
}
pub fn create_s3_bucket_readonly(
bucket: &str,
region: &str,
timeout: Duration,
) -> Result<s3::Bucket, anyhow::Error> {
let creds = s3::creds::Credentials::anonymous()?;
create_s3_bucket(bucket, region, timeout, creds)
}
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
struct S3CredentialsConfig {
access_key: String,
secret_key: String,
}
pub fn create_s3_bucket_read_write(
bucket: &str,
region: &str,
timeout: Duration,
credentials_file: Option<PathBuf>,
) -> Result<s3::Bucket, anyhow::Error> {
let creds = match credentials_file {
Some(credentials_file) => {
let mut file = std::fs::File::open(credentials_file)?;
let mut json_config_str = String::new();
file.read_to_string(&mut json_config_str)?;
let credentials_config: S3CredentialsConfig = serde_json::from_str(&json_config_str)?;
s3::creds::Credentials::new(
Some(&credentials_config.access_key),
Some(&credentials_config.secret_key),
None,
None,
None,
)
}
None => s3::creds::Credentials::default(),
}?;
create_s3_bucket(bucket, region, timeout, creds)
}
fn create_s3_bucket(
bucket: &str,
region: &str,
timeout: Duration,
creds: s3::creds::Credentials,
) -> Result<s3::Bucket, anyhow::Error> {
let mut bucket = s3::Bucket::new(bucket, region.parse::<s3::Region>()?, creds)?;
bucket.set_request_timeout(Some(timeout));
Ok(bucket)
}