use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde_json::{json, Value};
use std::collections::HashMap;
use indexmap::IndexMap;
use tracing::{debug, warn};
use crate::types::Document;
use crate::services::aws_estate_service::AwsServiceParser;
pub struct S3Parser;
impl S3Parser {
pub fn new() -> Self {
Self
}
fn generate_arn(
service: &str,
region: &str,
account_id: &str,
resource_type: &str,
resource_id: &str,
) -> String {
if service == "s3" && region.is_empty() {
format!("arn:aws:s3:::{}", resource_id)
} else {
format!("arn:aws:{}:{}:{}:{}:{}", service, region, account_id, resource_type, resource_id)
}
}
fn create_base_metadata(
account_id: &str,
service: &str,
resource_type: &str,
region: Option<&str>,
) -> IndexMap<String, Value> {
let mut metadata = IndexMap::new();
metadata.insert("account_id".to_string(), json!(account_id));
metadata.insert("service".to_string(), json!(service));
metadata.insert("resource_type".to_string(), json!(resource_type));
metadata.insert("cloud_provider".to_string(), json!("aws"));
metadata.insert("document_type".to_string(), json!("aws_estate"));
metadata.insert("last_synced".to_string(), json!(chrono::Utc::now().timestamp()));
if let Some(region_val) = region {
metadata.insert("region".to_string(), json!(region_val));
}
metadata
}
fn parse_s3_buckets(&self, account_id: &str, buckets: &[Value]) -> Result<Vec<Document>> {
let mut documents = Vec::new();
for bucket in buckets {
let bucket_name = bucket["name"]
.as_str()
.or_else(|| bucket["bucket_name"].as_str())
.unwrap_or("unknown");
let region = bucket["region"]
.as_str()
.unwrap_or("us-east-1");
let creation_date = bucket["creation_date"]
.as_str()
.unwrap_or("unknown");
let versioning = bucket["versioning"]
.as_str()
.unwrap_or("Disabled");
let encryption = bucket["encryption"]
.as_str()
.unwrap_or("Disabled");
let doc_id = Self::generate_arn(
"s3",
"",
account_id,
"bucket",
bucket_name,
);
let content = format!(
"S3 Bucket {} in region {} - Created: {} - Versioning: {} - Encryption: {}",
bucket_name, region, creation_date, versioning, encryption
);
let mut metadata = Self::create_base_metadata(
account_id,
"s3",
"s3-bucket",
Some(region),
);
metadata.insert("bucket_name".to_string(), json!(bucket_name));
metadata.insert("creation_date".to_string(), json!(creation_date));
metadata.insert("versioning".to_string(), json!(versioning));
metadata.insert("encryption".to_string(), json!(encryption));
if let Some(public_access) = bucket.get("public_access_blocked").and_then(|v| v.as_bool()) {
metadata.insert("public_access_blocked".to_string(), json!(public_access));
}
if let Some(object_count) = bucket.get("object_count").and_then(|v| v.as_u64()) {
metadata.insert("object_count".to_string(), json!(object_count));
}
if let Some(size_bytes) = bucket.get("size_bytes").and_then(|v| v.as_u64()) {
metadata.insert("size_bytes".to_string(), json!(size_bytes));
}
if let Some(permissions) = bucket.get("permissions") {
metadata.insert("iam_permissions".to_string(), permissions.clone());
}
metadata.insert("tags".to_string(), json!({
"BucketName": bucket_name,
"Region": region
}));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
documents.push(doc);
}
Ok(documents)
}
fn parse_bucket_policies_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
if let Some(policy_count) = service_data.get("bucket_policies_count").and_then(|v| v.as_u64()) {
if policy_count > 0 {
let doc_id = format!("arn:aws:s3:global:{}:bucket-policies-summary", account_id);
let public_policies = service_data.get("public_bucket_policies").and_then(|v| v.as_u64()).unwrap_or(0);
let content = format!(
"S3 Bucket Policies Summary: {} total bucket policies ({} with public access) | Cloud Provider: aws",
policy_count, public_policies
);
let mut metadata = Self::create_base_metadata(
account_id,
"s3",
"s3-policies-summary",
Some("global"),
);
metadata.insert("bucket_policies_count".to_string(), json!(policy_count));
metadata.insert("public_bucket_policies".to_string(), json!(public_policies));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
return Ok(Some(doc));
}
}
Ok(None)
}
}
#[async_trait]
impl AwsServiceParser for S3Parser {
fn service_name(&self) -> &str {
"s3"
}
fn can_parse(&self, service_data: &Value) -> bool {
service_data.is_object() && (
service_data.get("buckets").is_some() ||
service_data.get("bucket_policies_count").is_some()
)
}
async fn parse(&self, account_id: &str, service_data: &Value) -> Result<Vec<Document>> {
debug!("🔍 S3 parser processing data for account: {}", account_id);
let mut documents = Vec::new();
if let Some(buckets) = service_data.get("buckets").and_then(|v| v.as_array()) {
if !buckets.is_empty() {
let mut bucket_docs = self.parse_s3_buckets(account_id, buckets)?;
documents.append(&mut bucket_docs);
debug!("✅ Parsed {} S3 buckets", buckets.len());
}
}
if let Some(policies_doc) = self.parse_bucket_policies_summary(account_id, service_data)? {
documents.push(policies_doc);
debug!("✅ Parsed S3 bucket policies summary");
}
if documents.is_empty() {
warn!("🟡 S3 parser found no parseable data");
} else {
debug!("🎉 S3 parser generated {} documents", documents.len());
}
Ok(documents)
}
fn get_data_schema(&self) -> Option<Value> {
Some(json!({
"type": "object",
"properties": {
"buckets": {
"type": "array",
"items": {
"type": "object",
"required": ["bucket_name", "region", "creation_date"],
"properties": {
"bucket_name": {"type": "string"},
"region": {"type": "string"},
"creation_date": {"type": "string"},
"versioning": {"type": "string"},
"encryption": {"type": "string"},
"public_access_blocked": {"type": "boolean"},
"object_count": {"type": "number"},
"size_bytes": {"type": "number"},
"permissions": {"type": "array"}
}
}
},
"bucket_policies_count": {"type": "number"},
"public_bucket_policies": {"type": "number"}
}
}))
}
}