use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde_json::{json, Value};
use std::collections::HashMap;
use indexmap::IndexMap;
use tracing::{debug, warn};
use crate::types::Document;
use crate::services::aws_estate_service::AwsServiceParser;
pub struct RdsParser;
impl RdsParser {
pub fn new() -> Self {
Self
}
fn generate_arn(
service: &str,
region: &str,
account_id: &str,
resource_type: &str,
resource_id: &str,
) -> String {
format!("arn:aws:{}:{}:{}:{}:{}", service, region, account_id, resource_type, resource_id)
}
fn create_base_metadata(
account_id: &str,
service: &str,
resource_type: &str,
region: Option<&str>,
) -> IndexMap<String, Value> {
let mut metadata = IndexMap::new();
metadata.insert("account_id".to_string(), json!(account_id));
metadata.insert("service".to_string(), json!(service));
metadata.insert("resource_type".to_string(), json!(resource_type));
metadata.insert("cloud_provider".to_string(), json!("aws"));
metadata.insert("document_type".to_string(), json!("aws_estate"));
metadata.insert("last_synced".to_string(), json!(chrono::Utc::now().timestamp()));
if let Some(region_val) = region {
metadata.insert("region".to_string(), json!(region_val));
}
metadata
}
fn parse_rds_instances(&self, account_id: &str, instances: &[Value]) -> Result<Vec<Document>> {
let mut documents = Vec::new();
for instance in instances {
let instance_id = instance["instance_id"]
.as_str()
.unwrap_or("unknown");
let db_identifier = instance["db_instance_identifier"]
.as_str()
.unwrap_or("unknown");
let region = instance["region"]
.as_str()
.unwrap_or("us-east-1");
let db_class = instance["db_instance_class"]
.as_str()
.unwrap_or("unknown");
let engine = instance["engine"]
.as_str()
.unwrap_or("unknown");
let status = instance["db_instance_status"]
.as_str()
.unwrap_or("unknown");
let name = instance["name"]
.as_str()
.unwrap_or("unnamed");
let doc_id = Self::generate_arn(
"rds",
region,
account_id,
"db",
db_identifier,
);
let content = format!(
"RDS Database {} ({}) in region {} - Engine: {} - Class: {} - Status: {}",
name, db_identifier, region, engine, db_class, status
);
let mut metadata = Self::create_base_metadata(
account_id,
"rds",
"rds-instance",
Some(region),
);
metadata.insert("instance_id".to_string(), json!(instance_id));
metadata.insert("db_instance_identifier".to_string(), json!(db_identifier));
metadata.insert("db_instance_class".to_string(), json!(db_class));
metadata.insert("engine".to_string(), json!(engine));
metadata.insert("db_instance_status".to_string(), json!(status));
metadata.insert("tags".to_string(), json!({
"Name": name,
"Engine": engine,
"Class": db_class
}));
if let Some(multi_az) = instance.get("multi_az").and_then(|v| v.as_bool()) {
metadata.insert("multi_az".to_string(), json!(multi_az));
}
if let Some(allocated_storage) = instance.get("allocated_storage").and_then(|v| v.as_u64()) {
metadata.insert("allocated_storage".to_string(), json!(allocated_storage));
}
if let Some(storage_type) = instance.get("storage_type").and_then(|v| v.as_str()) {
metadata.insert("storage_type".to_string(), json!(storage_type));
}
if let Some(permissions) = instance.get("permissions") {
metadata.insert("iam_permissions".to_string(), permissions.clone());
}
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
documents.push(doc);
}
Ok(documents)
}
fn parse_clusters_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
if let Some(clusters_count) = service_data.get("clusters_count").and_then(|v| v.as_u64()) {
if clusters_count > 0 {
let doc_id = format!("arn:aws:rds:global:{}:clusters-summary", account_id);
let content = format!(
"RDS Clusters Summary: {} total clusters | Cloud Provider: aws",
clusters_count
);
let mut metadata = Self::create_base_metadata(
account_id,
"rds",
"rds-cluster-summary",
Some("global"),
);
metadata.insert("clusters_count".to_string(), json!(clusters_count));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
return Ok(Some(doc));
}
}
Ok(None)
}
fn parse_snapshots_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
if let Some(snapshots_count) = service_data.get("snapshots_count").and_then(|v| v.as_u64()) {
if snapshots_count > 0 {
let doc_id = format!("arn:aws:rds:global:{}:snapshots-summary", account_id);
let content = format!(
"RDS Snapshots Summary: {} total snapshots available for restore | Cloud Provider: aws",
snapshots_count
);
let mut metadata = Self::create_base_metadata(
account_id,
"rds",
"rds-snapshot-summary",
Some("global"),
);
metadata.insert("snapshots_count".to_string(), json!(snapshots_count));
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
return Ok(Some(doc));
}
}
Ok(None)
}
}
#[async_trait]
impl AwsServiceParser for RdsParser {
fn service_name(&self) -> &str {
"rds"
}
fn can_parse(&self, service_data: &Value) -> bool {
service_data.is_object() && (
service_data.get("instances").is_some() ||
service_data.get("clusters_count").is_some() ||
service_data.get("snapshots_count").is_some()
)
}
async fn parse(&self, account_id: &str, service_data: &Value) -> Result<Vec<Document>> {
debug!("🔍 RDS parser processing data for account: {}", account_id);
let mut documents = Vec::new();
if let Some(instances) = service_data.get("instances").and_then(|v| v.as_array()) {
if !instances.is_empty() {
let mut instance_docs = self.parse_rds_instances(account_id, instances)?;
documents.append(&mut instance_docs);
debug!("✅ Parsed {} RDS instances", instances.len());
}
}
if let Some(clusters_doc) = self.parse_clusters_summary(account_id, service_data)? {
documents.push(clusters_doc);
debug!("✅ Parsed RDS clusters summary");
}
if let Some(snapshots_doc) = self.parse_snapshots_summary(account_id, service_data)? {
documents.push(snapshots_doc);
debug!("✅ Parsed RDS snapshots summary");
}
if documents.is_empty() {
warn!("🟡 RDS parser found no parseable data");
} else {
debug!("🎉 RDS parser generated {} documents", documents.len());
}
Ok(documents)
}
fn get_data_schema(&self) -> Option<Value> {
Some(json!({
"type": "object",
"properties": {
"instances": {
"type": "array",
"items": {
"type": "object",
"required": ["instance_id", "db_instance_identifier", "region", "engine"],
"properties": {
"instance_id": {"type": "string"},
"db_instance_identifier": {"type": "string"},
"db_instance_class": {"type": "string"},
"engine": {"type": "string"},
"db_instance_status": {"type": "string"},
"region": {"type": "string"},
"name": {"type": "string"},
"multi_az": {"type": "boolean"},
"allocated_storage": {"type": "number"},
"storage_type": {"type": "string"}
}
}
},
"clusters_count": {"type": "number"},
"snapshots_count": {"type": "number"}
}
}))
}
}