use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde_json::{json, Value};
use std::collections::HashMap;
use indexmap::IndexMap;
use tracing::{debug, warn};
use crate::types::Document;
use crate::services::aws_estate_service::AwsServiceParser;
pub struct EbsParser;
impl EbsParser {
pub fn new() -> Self {
Self
}
fn generate_arn(
service: &str,
region: &str,
account_id: &str,
resource_type: &str,
resource_id: &str,
) -> String {
format!("arn:aws:{}:{}:{}:{}:{}", service, region, account_id, resource_type, resource_id)
}
fn create_base_metadata(
account_id: &str,
service: &str,
resource_type: &str,
region: Option<&str>,
) -> IndexMap<String, Value> {
let mut metadata = IndexMap::new();
metadata.insert("account_id".to_string(), json!(account_id));
metadata.insert("service".to_string(), json!(service));
metadata.insert("resource_type".to_string(), json!(resource_type));
metadata.insert("cloud_provider".to_string(), json!("aws"));
metadata.insert("document_type".to_string(), json!("aws_estate"));
metadata.insert("last_synced".to_string(), json!(chrono::Utc::now().timestamp()));
if let Some(region_val) = region {
metadata.insert("region".to_string(), json!(region_val));
}
metadata
}
fn parse_ebs_volumes(&self, account_id: &str, volumes: &[Value]) -> Result<Vec<Document>> {
let mut documents = Vec::new();
for volume in volumes {
let volume_id = volume["volume_id"]
.as_str()
.unwrap_or("unknown");
let region = volume["region"]
.as_str()
.unwrap_or("us-east-1");
let size_gb = volume["size"]
.as_u64()
.unwrap_or(0);
let volume_type = volume["volume_type"]
.as_str()
.unwrap_or("unknown");
let state = volume["state"]
.as_str()
.unwrap_or("unknown");
let availability_zone = volume["availability_zone"]
.as_str()
.unwrap_or("unknown");
let encrypted = volume["encrypted"]
.as_bool()
.unwrap_or(false);
let doc_id = Self::generate_arn(
"ec2",
region,
account_id,
"volume",
volume_id,
);
let content = format!(
"EBS Volume {} in {} ({}) - Type: {} - Size: {}GB - State: {} - Encrypted: {}",
volume_id, availability_zone, region, volume_type, size_gb, state, encrypted
);
let mut metadata = Self::create_base_metadata(
account_id,
"ebs",
"ebs-volume",
Some(region),
);
metadata.insert("volume_id".to_string(), json!(volume_id));
metadata.insert("size".to_string(), json!(size_gb));
metadata.insert("volume_type".to_string(), json!(volume_type));
metadata.insert("state".to_string(), json!(state));
metadata.insert("availability_zone".to_string(), json!(availability_zone));
metadata.insert("encrypted".to_string(), json!(encrypted));
if let Some(create_time) = volume.get("create_time").and_then(|v| v.as_str()) {
metadata.insert("create_time".to_string(), json!(create_time));
}
if let Some(attachment_state) = volume.get("attachment_state").and_then(|v| v.as_str()) {
metadata.insert("attachment_state".to_string(), json!(attachment_state));
}
if let Some(attached_instance_id) = volume.get("attached_instance_id").and_then(|v| v.as_str()) {
metadata.insert("attached_instance_id".to_string(), json!(attached_instance_id));
}
if let Some(iops) = volume.get("iops").and_then(|v| v.as_u64()) {
metadata.insert("iops".to_string(), json!(iops));
}
if let Some(throughput) = volume.get("throughput").and_then(|v| v.as_u64()) {
metadata.insert("throughput".to_string(), json!(throughput));
}
if let Some(permissions) = volume.get("permissions") {
metadata.insert("iam_permissions".to_string(), permissions.clone());
}
metadata.insert("tags".to_string(), json!({
"VolumeId": volume_id,
"VolumeType": volume_type,
"Size": format!("{}GB", size_gb)
}));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
documents.push(doc);
}
Ok(documents)
}
fn parse_snapshots_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
if let Some(snapshots_count) = service_data.get("snapshots_count").and_then(|v| v.as_u64()) {
if snapshots_count > 0 {
let doc_id = format!("arn:aws:ec2:global:{}:snapshots-summary", account_id);
let total_size_gb = service_data.get("snapshots_total_size_gb").and_then(|v| v.as_u64()).unwrap_or(0);
let encrypted_snapshots = service_data.get("encrypted_snapshots").and_then(|v| v.as_u64()).unwrap_or(0);
let content = format!(
"EBS Snapshots Summary: {} total snapshots ({} GB total, {} encrypted) | Cloud Provider: aws",
snapshots_count, total_size_gb, encrypted_snapshots
);
let mut metadata = Self::create_base_metadata(
account_id,
"ebs",
"ebs-snapshots-summary",
Some("global"),
);
metadata.insert("snapshots_count".to_string(), json!(snapshots_count));
metadata.insert("snapshots_total_size_gb".to_string(), json!(total_size_gb));
metadata.insert("encrypted_snapshots".to_string(), json!(encrypted_snapshots));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
return Ok(Some(doc));
}
}
Ok(None)
}
fn parse_encryption_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
if let (
Some(total_volumes),
Some(encrypted_volumes)
) = (
service_data.get("total_volumes").and_then(|v| v.as_u64()),
service_data.get("encrypted_volumes").and_then(|v| v.as_u64()),
) {
let doc_id = format!("arn:aws:ebs:global:{}:encryption-summary", account_id);
let encryption_percentage = if total_volumes > 0 {
(encrypted_volumes * 100) / total_volumes
} else {
0
};
let content = format!(
"EBS Encryption Summary: {} of {} volumes encrypted ({}%) | Cloud Provider: aws",
encrypted_volumes, total_volumes, encryption_percentage
);
let mut metadata = Self::create_base_metadata(
account_id,
"ebs",
"ebs-encryption-summary",
Some("global"),
);
metadata.insert("total_volumes".to_string(), json!(total_volumes));
metadata.insert("encrypted_volumes".to_string(), json!(encrypted_volumes));
metadata.insert("encryption_percentage".to_string(), json!(encryption_percentage));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
return Ok(Some(doc));
}
Ok(None)
}
}
#[async_trait]
impl AwsServiceParser for EbsParser {
fn service_name(&self) -> &str {
"ebs"
}
fn can_parse(&self, service_data: &Value) -> bool {
service_data.is_object() && (
service_data.get("volumes").is_some() ||
service_data.get("snapshots_count").is_some() ||
service_data.get("total_volumes").is_some()
)
}
async fn parse(&self, account_id: &str, service_data: &Value) -> Result<Vec<Document>> {
debug!("🔍 EBS parser processing data for account: {}", account_id);
let mut documents = Vec::new();
if let Some(volumes) = service_data.get("volumes").and_then(|v| v.as_array()) {
if !volumes.is_empty() {
let mut volume_docs = self.parse_ebs_volumes(account_id, volumes)?;
documents.append(&mut volume_docs);
debug!("✅ Parsed {} EBS volumes", volumes.len());
}
}
if let Some(snapshots_doc) = self.parse_snapshots_summary(account_id, service_data)? {
documents.push(snapshots_doc);
debug!("✅ Parsed EBS snapshots summary");
}
if let Some(encryption_doc) = self.parse_encryption_summary(account_id, service_data)? {
documents.push(encryption_doc);
debug!("✅ Parsed EBS encryption summary");
}
if documents.is_empty() {
warn!("🟡 EBS parser found no parseable data");
} else {
debug!("🎉 EBS parser generated {} documents", documents.len());
}
Ok(documents)
}
fn get_data_schema(&self) -> Option<Value> {
Some(json!({
"type": "object",
"properties": {
"volumes": {
"type": "array",
"items": {
"type": "object",
"required": ["volume_id", "region", "size", "volume_type", "state"],
"properties": {
"volume_id": {"type": "string"},
"region": {"type": "string"},
"size": {"type": "number"},
"volume_type": {"type": "string"},
"state": {"type": "string"},
"availability_zone": {"type": "string"},
"encrypted": {"type": "boolean"},
"create_time": {"type": "string"},
"attachment_state": {"type": "string"},
"attached_instance_id": {"type": "string"},
"iops": {"type": "number"},
"throughput": {"type": "number"},
"permissions": {"type": "array"}
}
}
},
"snapshots_count": {"type": "number"},
"snapshots_total_size_gb": {"type": "number"},
"encrypted_snapshots": {"type": "number"},
"total_volumes": {"type": "number"},
"encrypted_volumes": {"type": "number"}
}
}))
}
}