use crate::domain::error::{Result, ServiceError, StygianError};
use crate::ports::storage::{StoragePort, StorageRecord};
use crate::ports::{ScrapingService, ServiceInput, ServiceOutput};
use async_trait::async_trait;
use s3::creds::Credentials;
use s3::{Bucket, Region};
use serde_json::json;
#[derive(Debug, Clone)]
pub struct S3StorageConfig {
pub bucket: String,
pub region: String,
pub endpoint: Option<String>,
pub prefix: String,
pub path_style: bool,
pub access_key: Option<String>,
pub secret_key: Option<String>,
}
impl Default for S3StorageConfig {
fn default() -> Self {
Self {
bucket: String::new(),
region: "us-east-1".to_string(),
endpoint: None,
prefix: "stygian".to_string(),
path_style: false,
access_key: None,
secret_key: None,
}
}
}
pub struct S3Storage {
bucket: Box<Bucket>,
prefix: String,
}
const MULTIPART_THRESHOLD: usize = 5 * 1024 * 1024;
impl S3Storage {
pub fn new(config: S3StorageConfig) -> Result<Self> {
let credentials = match (&config.access_key, &config.secret_key) {
(Some(ak), Some(sk)) => Credentials::new(Some(ak), Some(sk), None, None, None)
.map_err(|e| {
StygianError::Service(ServiceError::AuthenticationFailed(format!(
"S3 credentials error: {e}"
)))
})?,
_ => Credentials::from_env().map_err(|e| {
StygianError::Service(ServiceError::AuthenticationFailed(format!(
"S3 credentials from env: {e}"
)))
})?,
};
let region = match &config.endpoint {
Some(endpoint) => Region::Custom {
region: config.region.clone(),
endpoint: endpoint.clone(),
},
None => config.region.parse::<Region>().map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"invalid S3 region '{}': {e}",
config.region
)))
})?,
};
let mut bucket = Bucket::new(&config.bucket, region, credentials).map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 bucket init failed: {e}"
)))
})?;
if config.path_style {
bucket.set_path_style();
}
Ok(Self {
bucket,
prefix: config.prefix,
})
}
fn record_key(&self, record: &StorageRecord) -> String {
let safe_pipeline = sanitise(&record.pipeline_id);
let safe_node = sanitise(&record.node_name);
format!(
"{}/{}/{}/{}.json",
self.prefix, safe_pipeline, safe_node, record.id
)
}
fn index_key(&self, id: &str) -> String {
format!("{}/_index/{}", self.prefix, sanitise(id))
}
fn pipeline_prefix(&self, pipeline_id: &str) -> String {
format!("{}/{}/", self.prefix, sanitise(pipeline_id))
}
async fn put_object(&self, key: &str, body: &[u8], content_type: &str) -> Result<()> {
if body.len() > MULTIPART_THRESHOLD {
self.bucket
.put_object_stream_with_content_type(
&mut std::io::Cursor::new(body),
key,
content_type,
)
.await
.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 multipart PUT '{key}' failed: {e}"
)))
})?;
} else {
self.bucket
.put_object_with_content_type(key, body, content_type)
.await
.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 PUT '{key}' failed: {e}"
)))
})?;
}
Ok(())
}
}
fn sanitise(s: &str) -> String {
s.replace(['/', '\\', '.', ':', ' '], "_")
}
#[async_trait]
impl StoragePort for S3Storage {
async fn store(&self, record: StorageRecord) -> Result<()> {
let key = self.record_key(&record);
let body = serde_json::to_vec(&record).map_err(|e| {
StygianError::Service(ServiceError::InvalidResponse(format!(
"S3 serialise record failed: {e}"
)))
})?;
self.put_object(&key, &body, "application/json").await?;
let idx_key = self.index_key(&record.id);
self.put_object(idx_key.as_str(), key.as_bytes(), "text/plain")
.await?;
Ok(())
}
async fn retrieve(&self, id: &str) -> Result<Option<StorageRecord>> {
let idx_key = self.index_key(id);
let idx_resp = self.bucket.get_object(&idx_key).await;
let full_key = match idx_resp {
Ok(resp) if resp.status_code() == 200 => {
String::from_utf8_lossy(resp.as_slice()).to_string()
}
Ok(_) | Err(_) => return Ok(None),
};
let resp = self.bucket.get_object(&full_key).await.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 GET '{full_key}' failed: {e}"
)))
})?;
if resp.status_code() != 200 {
return Ok(None);
}
let record: StorageRecord = serde_json::from_slice(resp.as_slice()).map_err(|e| {
StygianError::Service(ServiceError::InvalidResponse(format!(
"S3 deserialise record failed: {e}"
)))
})?;
Ok(Some(record))
}
async fn list(&self, pipeline_id: &str) -> Result<Vec<StorageRecord>> {
let prefix = self.pipeline_prefix(pipeline_id);
let results = self.bucket.list(prefix.clone(), None).await.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 LIST prefix '{prefix}' failed: {e}"
)))
})?;
let mut records = Vec::new();
for list_result in &results {
for obj in &list_result.contents {
if obj.key.contains("/_index/") {
continue;
}
let resp = self.bucket.get_object(&obj.key).await.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 GET '{}' failed: {e}",
obj.key
)))
})?;
if resp.status_code() == 200
&& let Ok(record) = serde_json::from_slice::<StorageRecord>(resp.as_slice())
{
records.push(record);
}
}
}
Ok(records)
}
async fn delete(&self, id: &str) -> Result<()> {
let idx_key = self.index_key(id);
let idx_resp = self.bucket.get_object(&idx_key).await;
if let Ok(resp) = idx_resp
&& resp.status_code() == 200
{
let full_key = String::from_utf8_lossy(resp.as_slice()).to_string();
let _ = self.bucket.delete_object(&full_key).await;
let _ = self.bucket.delete_object(&idx_key).await;
}
Ok(())
}
}
#[async_trait]
impl ScrapingService for S3Storage {
async fn execute(&self, input: ServiceInput) -> Result<ServiceOutput> {
let action = input
.params
.get("action")
.and_then(|v| v.as_str())
.unwrap_or("get");
if action == "list" {
let prefix = input.url;
let results = self.bucket.list(prefix.clone(), None).await.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 LIST '{prefix}' failed: {e}"
)))
})?;
let keys: Vec<&str> = results
.iter()
.flat_map(|r| r.contents.iter().map(|o| o.key.as_str()))
.collect();
Ok(ServiceOutput {
data: serde_json::to_string(&keys).unwrap_or_default(),
metadata: json!({
"source": "s3",
"action": "list",
"prefix": prefix,
"count": keys.len(),
}),
})
} else {
let key = &input.url;
let resp = self.bucket.get_object(key).await.map_err(|e| {
StygianError::Service(ServiceError::Unavailable(format!(
"S3 GET '{key}' failed: {e}"
)))
})?;
if resp.status_code() != 200 {
return Err(StygianError::Service(ServiceError::InvalidResponse(
format!("S3 GET '{key}' returned status {}", resp.status_code()),
)));
}
let data = String::from_utf8_lossy(resp.as_slice()).to_string();
Ok(ServiceOutput {
data,
metadata: json!({
"source": "s3",
"action": "get",
"key": key,
"size": resp.as_slice().len(),
}),
})
}
}
fn name(&self) -> &'static str {
"s3-storage"
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_sanitise() {
assert_eq!(sanitise("pipe/1"), "pipe_1");
assert_eq!(sanitise("a.b:c\\d e"), "a_b_c_d_e");
}
#[test]
fn test_record_key_structure() {
let prefix = "stygian";
let pipeline_id = "my-pipeline";
let node_name = "fetch";
let id = "abc-123";
let key = format!(
"{}/{}/{}/{}.json",
prefix,
sanitise(pipeline_id),
sanitise(node_name),
id
);
assert_eq!(key, "stygian/my-pipeline/fetch/abc-123.json");
}
#[test]
fn test_index_key_structure() {
let prefix = "stygian";
let id = "abc-123";
let key = format!("{}/_index/{}", prefix, sanitise(id));
assert_eq!(key, "stygian/_index/abc-123");
}
#[test]
fn test_pipeline_prefix_structure() {
let prefix = "stygian";
let pipeline_id = "pipe/1";
let pfx = format!("{}/{}/", prefix, sanitise(pipeline_id));
assert_eq!(pfx, "stygian/pipe_1/");
}
#[test]
fn test_default_config() {
let cfg = S3StorageConfig::default();
assert_eq!(cfg.region, "us-east-1");
assert_eq!(cfg.prefix, "stygian");
assert!(!cfg.path_style);
assert!(cfg.endpoint.is_none());
assert!(cfg.access_key.is_none());
assert!(cfg.secret_key.is_none());
}
#[test]
fn test_sanitise_preserves_safe_chars() {
assert_eq!(sanitise("hello-world_123"), "hello-world_123");
}
#[test]
fn test_key_with_special_pipeline_id() {
let prefix = "data";
let pipeline_id = "org/team:project.v2";
let node_name = "extract";
let id = "uuid-1";
let key = format!(
"{}/{}/{}/{}.json",
prefix,
sanitise(pipeline_id),
sanitise(node_name),
id
);
assert_eq!(key, "data/org_team_project_v2/extract/uuid-1.json");
}
}