use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde::{Serialize, Deserialize};
use serde_json::{json, Value};
use std::collections::HashMap;
use indexmap::IndexMap;
use std::sync::Arc;
use tracing::{info, warn, debug};
use crate::types::Document;
use crate::CreateResult;
use crate::services::DocumentService;
#[async_trait]
pub trait AwsServiceParser: Send + Sync {
fn service_name(&self) -> &str;
fn can_parse(&self, service_data: &Value) -> bool;
async fn parse(
&self,
account_id: &str,
service_data: &Value,
) -> Result<Vec<Document>>;
fn get_data_schema(&self) -> Option<Value> {
None
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsEstateIngestResult {
pub total_accounts: usize,
pub total_services: usize,
pub total_resources: usize,
pub parsed_resources: usize,
pub failed_resources: usize,
pub supported_services: Vec<String>,
pub unsupported_services: Vec<String>,
pub create_result: CreateResult,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct AwsEstateConfig {
pub enabled_services: Option<Vec<String>>, pub skip_empty_services: bool,
pub include_permissions: bool,
pub generate_synthetic_embeddings: bool, }
impl Default for AwsEstateConfig {
fn default() -> Self {
Self {
enabled_services: None,
skip_empty_services: true,
include_permissions: true,
generate_synthetic_embeddings: true,
}
}
}
pub struct AwsEstateService {
parsers: HashMap<String, Box<dyn AwsServiceParser>>,
document_service: Arc<DocumentService>,
config: AwsEstateConfig,
}
impl AwsEstateService {
pub fn new(document_service: Arc<DocumentService>) -> Self {
Self {
parsers: HashMap::new(),
document_service,
config: AwsEstateConfig::default(),
}
}
pub fn with_config(document_service: Arc<DocumentService>, config: AwsEstateConfig) -> Self {
Self {
parsers: HashMap::new(),
document_service,
config,
}
}
pub fn register_parser(&mut self, parser: Box<dyn AwsServiceParser>) {
let service_name = parser.service_name().to_string();
info!("Registering AWS service parser for: {}", service_name);
self.parsers.insert(service_name, parser);
}
pub fn get_supported_services(&self) -> Vec<String> {
self.parsers.keys().cloned().collect()
}
pub fn get_config(&self) -> &AwsEstateConfig {
&self.config
}
pub fn update_config(&mut self, config: AwsEstateConfig) {
self.config = config;
}
fn extract_accounts_from_estate_data(&self, estate_data: &Value) -> Result<Vec<Value>> {
if let Some(array) = estate_data.as_array() {
if let Some(first_elem) = array.first() {
if let Some(data_section) = first_elem.get("Data") {
if let Some(profiles) = data_section.get("Profiles").and_then(|p| p.as_array()) {
info!("🆕 Using array-wrapped Data.Profiles format");
let mut all_accounts = Vec::new();
for profile in profiles {
if let Some(accounts) = profile.get("Accounts").and_then(|a| a.as_array()) {
for account in accounts {
let transformed_account = self.transform_new_account_format(account)?;
all_accounts.push(transformed_account);
}
}
}
return Ok(all_accounts);
}
}
}
info!("🔄 Using legacy estate data format");
return Ok(array.clone());
}
if let Some(data_section) = estate_data.get("Data") {
if let Some(profiles) = data_section.get("Profiles").and_then(|p| p.as_array()) {
info!("🆕 Using Data.Profiles hierarchical format");
let mut all_accounts = Vec::new();
for profile in profiles {
if let Some(accounts) = profile.get("Accounts").and_then(|a| a.as_array()) {
for account in accounts {
let transformed_account = self.transform_new_account_format(account)?;
all_accounts.push(transformed_account);
}
}
}
return Ok(all_accounts);
}
}
Err(anyhow!("Estate data must be either an array of accounts (legacy format), wrapped array format, or contain Data.Profiles structure"))
}
fn transform_new_account_format(&self, new_account: &Value) -> Result<Value> {
let account_id = new_account.get("AccountId")
.and_then(|v| v.as_str())
.ok_or_else(|| anyhow!("Account missing AccountId"))?;
let access_method = new_account.get("AccessMethod")
.and_then(|v| v.as_str())
.unwrap_or("Unknown");
let role_arn = new_account.get("RoleArn");
let transformed_services = if let Some(services) = new_account.get("Services") {
self.transform_services_format(services)?
} else {
json!({})
};
let transformed_account = json!({
"account_id": account_id,
"role_arn": role_arn,
"account_name": format!("Account {}", account_id),
"access_method": access_method,
"scan_timestamp": chrono::Utc::now().to_rfc3339(),
"services": transformed_services
});
Ok(transformed_account)
}
fn transform_services_format(&self, new_services: &Value) -> Result<Value> {
let services_obj = new_services.as_object()
.ok_or_else(|| anyhow!("Services must be an object"))?;
let mut transformed_services = serde_json::Map::new();
for (service_name, service_data) in services_obj {
let service_key = service_name.to_lowercase();
let transformed_service = self.transform_individual_service(&service_key, service_data)?;
transformed_services.insert(service_key, transformed_service);
}
Ok(Value::Object(transformed_services))
}
fn transform_individual_service(&self, service_name: &str, service_data: &Value) -> Result<Value> {
match service_name {
"ec2" => self.transform_ec2_service(service_data),
"rds" => self.transform_rds_service(service_data),
"s3" => self.transform_s3_service(service_data),
"vpc" => self.transform_vpc_service(service_data),
"lambda" => self.transform_lambda_service(service_data),
"iam" => self.transform_iam_service(service_data),
_ => {
Ok(service_data.clone())
}
}
}
fn transform_ec2_service(&self, service_data: &Value) -> Result<Value> {
let mut transformed = json!({});
if let Some(by_region) = service_data.get("ByRegion").and_then(|r| r.as_array()) {
let mut all_instances = Vec::new();
for region_data in by_region {
if let Some(instances) = region_data.get("Instances").and_then(|i| i.as_array()) {
for instance in instances {
let mut legacy_instance = serde_json::Map::new();
if let Some(id) = instance.get("InstanceId") {
legacy_instance.insert("instance_id".to_string(), id.clone());
}
if let Some(instance_type) = instance.get("InstanceType") {
legacy_instance.insert("instance_type".to_string(), instance_type.clone());
}
if let Some(state) = instance.get("State") {
legacy_instance.insert("state".to_string(), state.clone());
}
if let Some(launch_time) = instance.get("LaunchTime") {
legacy_instance.insert("launch_time".to_string(), launch_time.clone());
}
if let Some(region) = region_data.get("Region") {
legacy_instance.insert("region".to_string(), region.clone());
}
if let Some(tags) = instance.get("Tags").and_then(|t| t.as_array()) {
for tag in tags {
if let (Some(key), Some(value)) = (tag.get("Key"), tag.get("Value")) {
if key.as_str() == Some("Name") {
legacy_instance.insert("name".to_string(), value.clone());
break;
}
}
}
}
if let Some(permissions) = service_data.get("Permissions") {
legacy_instance.insert("permissions".to_string(), permissions.clone());
}
for (key, value) in instance.as_object().unwrap_or(&serde_json::Map::new()) {
if !["InstanceId", "InstanceType", "State", "LaunchTime", "Tags"].contains(&key.as_str()) {
legacy_instance.insert(key.clone(), value.clone());
}
}
all_instances.push(Value::Object(legacy_instance));
}
}
}
transformed["instances"] = Value::Array(all_instances);
}
if let Some(permissions) = service_data.get("Permissions") {
transformed["permissions"] = permissions.clone();
}
if let Some(total) = service_data.get("TotalInstances") {
transformed["total_instances"] = total.clone();
}
Ok(transformed)
}
fn transform_rds_service(&self, service_data: &Value) -> Result<Value> {
let mut transformed = json!({});
if let Some(by_region) = service_data.get("ByRegion").and_then(|r| r.as_array()) {
let mut all_instances = Vec::new();
for region_data in by_region {
if let Some(databases) = region_data.get("Databases").and_then(|d| d.as_array()) {
for db in databases {
let mut legacy_db = serde_json::Map::new();
if let Some(id) = db.get("DBInstanceIdentifier") {
legacy_db.insert("db_instance_identifier".to_string(), id.clone());
legacy_db.insert("name".to_string(), id.clone());
}
if let Some(class) = db.get("DBInstanceClass") {
legacy_db.insert("db_instance_class".to_string(), class.clone());
}
if let Some(engine) = db.get("Engine") {
legacy_db.insert("engine".to_string(), engine.clone());
}
if let Some(status) = db.get("DBInstanceStatus") {
legacy_db.insert("db_instance_status".to_string(), status.clone());
}
if let Some(region) = region_data.get("Region") {
legacy_db.insert("region".to_string(), region.clone());
}
if let Some(permissions) = service_data.get("Permissions") {
legacy_db.insert("permissions".to_string(), permissions.clone());
}
for (key, value) in db.as_object().unwrap_or(&serde_json::Map::new()) {
legacy_db.insert(key.clone(), value.clone());
}
all_instances.push(Value::Object(legacy_db));
}
}
}
transformed["instances"] = Value::Array(all_instances);
}
if let Some(permissions) = service_data.get("Permissions") {
transformed["permissions"] = permissions.clone();
}
Ok(transformed)
}
fn transform_s3_service(&self, service_data: &Value) -> Result<Value> {
let mut transformed = json!({});
if let Some(by_profile) = service_data.get("ByProfile").and_then(|p| p.as_array()) {
let mut all_buckets = Vec::new();
let mut bucket_names = Vec::new();
for profile_data in by_profile {
if let Some(buckets) = profile_data.get("Buckets").and_then(|b| b.as_array()) {
for bucket in buckets {
let mut legacy_bucket = serde_json::Map::new();
if let Some(name) = bucket.get("Name") {
legacy_bucket.insert("name".to_string(), name.clone());
legacy_bucket.insert("s3_identifier".to_string(), name.clone());
if let Some(region) = bucket.get("Region") {
bucket_names.push(format!("{} ({})",
name.as_str().unwrap_or("unknown"),
region.as_str().unwrap_or("unknown")
));
}
}
if let Some(creation_date) = bucket.get("CreationDate") {
legacy_bucket.insert("creation_date".to_string(), creation_date.clone());
}
if let Some(region) = bucket.get("Region") {
legacy_bucket.insert("region".to_string(), region.clone());
}
if let Some(permissions) = service_data.get("Permissions") {
legacy_bucket.insert("permissions".to_string(), permissions.clone());
}
legacy_bucket.insert("is_public".to_string(), json!(false));
all_buckets.push(Value::Object(legacy_bucket));
}
}
}
transformed["buckets"] = Value::Array(all_buckets.clone()); transformed["latest_buckets_info"] = Value::Array(all_buckets); transformed["bucket_names"] = Value::Array(bucket_names.into_iter().map(|s| json!(s)).collect());
}
if let Some(total) = service_data.get("TotalBuckets") {
transformed["total_buckets"] = total.clone();
}
if let Some(permissions) = service_data.get("Permissions") {
transformed["permissions"] = permissions.clone();
}
Ok(transformed)
}
fn transform_vpc_service(&self, service_data: &Value) -> Result<Value> {
debug!("🔄 Transforming VPC service data: {}", serde_json::to_string_pretty(service_data).unwrap_or_else(|_| "invalid json".to_string()));
let mut transformed = json!({});
if let Some(by_region) = service_data.get("ByRegion").and_then(|r| r.as_array()) {
let mut all_vpcs = Vec::new();
for region_data in by_region {
if let Some(vpcs) = region_data.get("VPCs").and_then(|v| v.as_array()) {
for vpc in vpcs {
let mut legacy_vpc = serde_json::Map::new();
if let Some(vpc_id) = vpc.get("VpcId") {
legacy_vpc.insert("vpc_id".to_string(), vpc_id.clone());
legacy_vpc.insert("name".to_string(), vpc_id.clone()); }
if let Some(cidr) = vpc.get("CidrBlock") {
legacy_vpc.insert("cidr_block".to_string(), cidr.clone());
}
if let Some(state) = vpc.get("State") {
legacy_vpc.insert("state".to_string(), state.clone());
}
if let Some(is_default) = vpc.get("IsDefault") {
legacy_vpc.insert("is_default".to_string(), is_default.clone());
}
if let Some(region) = region_data.get("Region") {
legacy_vpc.insert("region".to_string(), region.clone());
}
if let Some(permissions) = service_data.get("Permissions") {
legacy_vpc.insert("permissions".to_string(), permissions.clone());
}
all_vpcs.push(Value::Object(legacy_vpc));
}
}
}
transformed["vpcs"] = Value::Array(all_vpcs);
}
if let Some(total) = service_data.get("TotalVPCs") {
transformed["total_vpcs"] = total.clone();
}
if let Some(permissions) = service_data.get("Permissions") {
transformed["permissions"] = permissions.clone();
}
Ok(transformed)
}
fn transform_lambda_service(&self, service_data: &Value) -> Result<Value> {
let mut transformed = json!({});
if let Some(by_region) = service_data.get("ByRegion").and_then(|r| r.as_array()) {
let mut all_functions = Vec::new();
for region_data in by_region {
if let Some(functions) = region_data.get("Functions").and_then(|f| f.as_array()) {
for func in functions {
let mut legacy_func = serde_json::Map::new();
if let Some(name) = func.get("FunctionName") {
legacy_func.insert("name".to_string(), name.clone());
legacy_func.insert("lambda_identifier".to_string(), name.clone());
}
if let Some(runtime) = func.get("Runtime") {
legacy_func.insert("runtime".to_string(), runtime.clone());
}
if let Some(memory) = func.get("MemorySize") {
legacy_func.insert("memory_size".to_string(), memory.clone());
}
if let Some(timeout) = func.get("Timeout") {
legacy_func.insert("timeout".to_string(), timeout.clone());
}
if let Some(region) = region_data.get("Region") {
legacy_func.insert("region".to_string(), region.clone());
}
if let Some(last_modified) = func.get("LastModified") {
legacy_func.insert("last_modified".to_string(), last_modified.clone());
}
if let Some(permissions) = service_data.get("Permissions") {
legacy_func.insert("permissions".to_string(), permissions.clone());
}
for (key, value) in func.as_object().unwrap_or(&serde_json::Map::new()) {
legacy_func.insert(key.clone(), value.clone());
}
all_functions.push(Value::Object(legacy_func));
}
}
}
transformed["functions"] = Value::Array(all_functions);
}
if let Some(permissions) = service_data.get("Permissions") {
transformed["permissions"] = permissions.clone();
}
Ok(transformed)
}
fn transform_iam_service(&self, service_data: &Value) -> Result<Value> {
debug!("🔄 Transforming IAM service data: {}", serde_json::to_string_pretty(service_data).unwrap_or_else(|_| "invalid json".to_string()));
let mut transformed = json!({});
if let Some(users) = service_data.get("Users") {
transformed["users"] = users.clone();
transformed["iam_users"] = users.clone(); }
if let Some(roles) = service_data.get("Roles") {
transformed["roles"] = roles.clone();
transformed["iam_roles"] = roles.clone(); }
if let Some(policies) = service_data.get("Policies") {
transformed["policies"] = policies.clone();
transformed["iam_policies"] = policies.clone(); }
if let Some(groups) = service_data.get("Groups") {
transformed["groups"] = groups.clone();
transformed["iam_groups"] = groups.clone(); }
if let Some(summary) = service_data.get("AccountSummary") {
transformed["account_summary"] = summary.clone();
}
if let Some(total) = service_data.get("TotalUsers") {
transformed["total_users"] = total.clone();
}
if let Some(total) = service_data.get("TotalRoles") {
transformed["total_roles"] = total.clone();
}
if let Some(total) = service_data.get("TotalPolicies") {
transformed["total_policies"] = total.clone();
}
if let Some(total) = service_data.get("TotalGroups") {
transformed["total_groups"] = total.clone();
}
if let Some(permissions) = service_data.get("Permissions") {
transformed["permissions"] = permissions.clone();
}
Ok(transformed)
}
pub async fn ingest_estate_data(&self, estate_data: Value) -> Result<AwsEstateIngestResult> {
info!("🏗️ Starting AWS estate data ingestion");
let mut result = AwsEstateIngestResult {
total_accounts: 0,
total_services: 0,
total_resources: 0,
parsed_resources: 0,
failed_resources: 0,
supported_services: self.get_supported_services(),
unsupported_services: Vec::new(),
create_result: CreateResult {
created: 0,
failed: Vec::new(),
},
};
let accounts = self.extract_accounts_from_estate_data(&estate_data)?;
result.total_accounts = accounts.len();
info!("📊 Processing {} AWS accounts", result.total_accounts);
let mut all_documents = Vec::new();
let mut processed_services = std::collections::HashSet::new();
for account_data in accounts {
if let Err(e) = self.process_account(
&account_data,
&mut result,
&mut all_documents,
&mut processed_services
).await {
warn!("Failed to process account: {}", e);
result.failed_resources += 1;
}
}
result.total_services = processed_services.len();
result.unsupported_services = processed_services
.into_iter()
.filter(|service| !result.supported_services.contains(service))
.collect();
if !all_documents.is_empty() {
info!("📝 Creating {} documents in batch", all_documents.len());
let batch_result = self.document_service.create_documents("aws_estate", all_documents).await?;
result.create_result = CreateResult {
created: batch_result.success_count,
failed: batch_result.failed.into_iter().map(|e| e.error).collect(),
};
info!("✅ Batch creation completed: {} created, {} failed",
result.create_result.created, result.create_result.failed.len());
}
info!("🎉 AWS estate ingestion completed:");
info!(" Accounts: {}", result.total_accounts);
info!(" Services: {}", result.total_services);
info!(" Resources: {} total, {} parsed, {} failed",
result.total_resources, result.parsed_resources, result.failed_resources);
info!(" Documents: {} created", result.create_result.created);
Ok(result)
}
async fn process_account(
&self,
account_data: &Value,
result: &mut AwsEstateIngestResult,
all_documents: &mut Vec<Document>,
processed_services: &mut std::collections::HashSet<String>,
) -> Result<()> {
let account_id = account_data.get("account_id")
.and_then(|v| v.as_str())
.unwrap_or("unknown");
let account_name = account_data.get("account_name")
.and_then(|v| v.as_str())
.unwrap_or("Unknown Account");
debug!("Processing account: {} ({})", account_name, account_id);
let services = match account_data.get("services") {
Some(services) => match services.as_object() {
Some(services_map) => services_map,
None => {
warn!("Services data is not an object for account {}", account_id);
return Ok(());
}
},
None => {
debug!("No services found for account {}", account_id);
return Ok(());
}
};
for (service_name, service_data) in services {
processed_services.insert(service_name.clone());
if let Some(ref enabled_services) = self.config.enabled_services {
if !enabled_services.contains(service_name) {
debug!("Skipping disabled service: {}", service_name);
continue;
}
}
if self.config.skip_empty_services && self.is_service_empty(service_data) {
debug!("Skipping empty service: {}", service_name);
continue;
}
let resource_count = self.count_resources_in_service(service_data);
result.total_resources += resource_count;
debug!("Processing service '{}' with {} resources", service_name, resource_count);
if let Some(parser) = self.parsers.get(service_name) {
if parser.can_parse(service_data) {
match parser.parse(account_id, service_data).await {
Ok(mut documents) => {
debug!("✅ Parser '{}' generated {} documents", service_name, documents.len());
result.parsed_resources += documents.len();
all_documents.append(&mut documents);
},
Err(e) => {
warn!("❌ Parser '{}' failed: {}", service_name, e);
result.failed_resources += 1;
}
}
} else {
debug!("Parser '{}' declined to parse service data", service_name);
}
} else {
debug!("No parser registered for service '{}'", service_name);
if let Ok(generic_doc) = self.create_generic_service_document(
account_id,
account_name,
service_name,
service_data
) {
all_documents.push(generic_doc);
result.parsed_resources += 1;
}
}
}
Ok(())
}
fn is_service_empty(&self, service_data: &Value) -> bool {
match service_data {
Value::Object(obj) => {
let resource_arrays = ["instances", "buckets", "functions", "clusters", "snapshots", "vpcs", "users", "roles"];
for array_name in resource_arrays {
if let Some(array) = obj.get(array_name).and_then(|v| v.as_array()) {
if !array.is_empty() {
return false;
}
}
}
let count_fields = ["total_count", "count", "total_instances", "total_buckets", "total_vpcs", "total_users", "total_roles"];
for count_field in count_fields {
if let Some(count) = obj.get(count_field).and_then(|v| v.as_u64()) {
if count > 0 {
return false;
}
}
}
true
},
_ => true,
}
}
fn count_resources_in_service(&self, service_data: &Value) -> usize {
let mut count = 0;
if let Some(obj) = service_data.as_object() {
let resource_arrays = [
"instances", "buckets", "functions", "clusters",
"snapshots", "volumes", "vpcs", "subnets"
];
for array_name in resource_arrays {
if let Some(array) = obj.get(array_name).and_then(|v| v.as_array()) {
count += array.len();
}
}
if count == 0 && !obj.is_empty() {
count = 1;
}
}
count
}
fn create_generic_service_document(
&self,
account_id: &str,
account_name: &str,
service_name: &str,
service_data: &Value,
) -> Result<Document> {
let doc_id = format!("aws-{}-{}-{}", account_id, service_name, chrono::Utc::now().timestamp_millis());
let mut content_parts = Vec::new();
content_parts.push(format!("AWS {} service in account {} ({})", service_name, account_name, account_id));
if let Some(obj) = service_data.as_object() {
for (key, value) in obj {
if key.contains("count") || key.contains("total") {
if let Some(num) = value.as_u64() {
content_parts.push(format!("{}: {}", key, num));
}
}
}
}
let content = content_parts.join(" | ");
let mut metadata = IndexMap::new();
metadata.insert("account_id".to_string(), json!(account_id));
metadata.insert("account_name".to_string(), json!(account_name));
metadata.insert("service".to_string(), json!(service_name));
metadata.insert("resource_type".to_string(), json!(format!("{}-service", service_name)));
metadata.insert("cloud_provider".to_string(), json!("aws"));
metadata.insert("document_type".to_string(), json!("aws_estate"));
metadata.insert("last_synced".to_string(), json!(chrono::Utc::now().timestamp()));
if let Some(obj) = service_data.as_object() {
for (key, value) in obj {
if !key.starts_with("_") && !matches!(value, Value::Object(_) | Value::Array(_)) {
metadata.insert(format!("service_{}", key), value.clone());
}
}
}
let mut doc = Document::new(doc_id, content);
doc.metadata = metadata;
Ok(doc)
}
pub async fn initialize(&self) -> Result<()> {
info!("🚀 AWS Estate Service initialized with {} parsers", self.parsers.len());
for service_name in self.parsers.keys() {
info!(" 📦 Registered parser: {}", service_name);
}
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
info!("🔄 AWS Estate Service shutdown");
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::services::DocumentService;
#[tokio::test]
async fn test_aws_estate_service_creation() {
assert_eq!(true, true); }
}