rag-module 0.6.7

Enterprise RAG module with chat context storage, vector search, session management, and model downloading. Rust implementation with Node.js compatibility.
//! EC2 Service Parser
//! 
//! Parses AWS EC2 service data including instances, volumes, and related resources.

use async_trait::async_trait;
use anyhow::{Result, anyhow};
use serde_json::{json, Value};
use std::collections::HashMap;
use indexmap::IndexMap;
use tracing::{debug, warn};

use crate::types::Document;
use crate::services::aws_estate_service::AwsServiceParser;

/// Parser for AWS EC2 service
pub struct Ec2Parser;

impl Ec2Parser {
    pub fn new() -> Self {
        Self
    }
    
    /// Generate AWS ARN
    fn generate_arn(
        service: &str,
        region: &str,
        account_id: &str,
        resource_type: &str,
        resource_id: &str,
    ) -> String {
        format!("arn:aws:{}:{}:{}:{}:{}", service, region, account_id, resource_type, resource_id)
    }
    
    /// Create base metadata for AWS resources
    fn create_base_metadata(
        account_id: &str,
        service: &str,
        resource_type: &str,
        region: Option<&str>,
    ) -> IndexMap<String, Value> {
        let mut metadata = IndexMap::new();
        metadata.insert("account_id".to_string(), json!(account_id));
        metadata.insert("service".to_string(), json!(service));
        metadata.insert("resource_type".to_string(), json!(resource_type));
        metadata.insert("cloud_provider".to_string(), json!("aws"));
        metadata.insert("document_type".to_string(), json!("aws_estate"));
        metadata.insert("last_synced".to_string(), json!(chrono::Utc::now().timestamp()));
        
        if let Some(region_val) = region {
            metadata.insert("region".to_string(), json!(region_val));
        }
        
        metadata
    }
    
    /// Parses EC2 instances from the service data
    fn parse_ec2_instances(&self, account_id: &str, instances: &[Value]) -> Result<Vec<Document>> {
        let mut documents = Vec::new();
        
        for instance in instances {
            let instance_id = instance["instance_id"]
                .as_str()
                .unwrap_or("unknown");
            let region = instance["region"]
                .as_str()
                .unwrap_or("us-east-1");
            let instance_type = instance["instance_type"]
                .as_str()
                .unwrap_or("unknown");
            let state = instance["state"]
                .as_str()
                .unwrap_or("unknown");
            let name = instance["name"]
                .as_str()
                .unwrap_or("unnamed");
            let launch_time = instance["launch_time"]
                .as_str()
                .unwrap_or("unknown");
            
            // Generate ARN for the EC2 instance
            let doc_id = Self::generate_arn(
                "ec2",
                region,
                account_id,
                "instance",
                instance_id,
            );
            
            // Create searchable content
            let content = format!(
                "EC2 Instance {} ({}) in region {} - State: {} - Launch Time: {} - Type: {}",
                name, instance_id, region, state, launch_time, instance_type
            );
            
            // Create base metadata
            let mut metadata = Self::create_base_metadata(
                account_id,
                "ec2",
                "ec2-instance",
                Some(region),
            );
            
            // Add EC2-specific metadata
            metadata.insert("instance_id".to_string(), json!(instance_id));
            metadata.insert("instance_type".to_string(), json!(instance_type));
            metadata.insert("state".to_string(), json!(state));
            metadata.insert("launch_time".to_string(), json!(launch_time));
            metadata.insert("tags".to_string(), json!({
                "Name": name,
                "InstanceType": instance_type
            }));
            
            // Add IAM permissions if available
            if let Some(permissions) = instance.get("permissions") {
                metadata.insert("iam_permissions".to_string(), permissions.clone());
            }
            
            // Add optional fields
            if let Some(availability_zone) = instance.get("availability_zone").and_then(|v| v.as_str()) {
                metadata.insert("availability_zone".to_string(), json!(availability_zone));
            }
            
            if let Some(security_groups) = instance.get("security_groups") {
                metadata.insert("security_groups".to_string(), security_groups.clone());
            }
            
            let mut doc = Document::new(doc_id, content);
            doc.metadata = metadata;
            documents.push(doc);
        }
        
        Ok(documents)
    }
    
    /// Parse EBS volume summary
    fn parse_volume_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
        if let (
            Some(volumes_count),
            Some(total_size_gb)
        ) = (
            service_data.get("volumes_count").and_then(|v| v.as_u64()),
            service_data.get("total_size_gb").and_then(|v| v.as_u64()),
        ) {
            let doc_id = format!("arn:aws:ec2:global:{}:volumes-summary", account_id);
            
            let attached = service_data.get("attached_volumes").and_then(|v| v.as_u64()).unwrap_or(0);
            let unattached = service_data.get("unattached_volumes").and_then(|v| v.as_u64()).unwrap_or(0);
            
            let content = format!(
                "EBS Volumes Summary: {} total volumes ({} GB) - {} attached, {} unattached | Cloud Provider: aws",
                volumes_count, total_size_gb, attached, unattached
            );
            
            let mut metadata = Self::create_base_metadata(
                account_id,
                "ec2",
                "ebs-volumes-summary",
                Some("global"),
            );
            
            metadata.insert("volumes_count".to_string(), json!(volumes_count));
            metadata.insert("total_size_gb".to_string(), json!(total_size_gb));
            metadata.insert("attached_volumes".to_string(), json!(attached));
            metadata.insert("unattached_volumes".to_string(), json!(unattached));
            
            let mut doc = Document::new(doc_id, content);
            doc.metadata = metadata;
            return Ok(Some(doc));
        }
        
        Ok(None)
    }
    
    /// Parse VPC summary
    fn parse_vpc_summary(&self, account_id: &str, service_data: &Value) -> Result<Option<Document>> {
        if let Some(vpcs_count) = service_data.get("vpcs_count").and_then(|v| v.as_u64()) {
            let doc_id = format!("arn:aws:ec2:global:{}:vpc-summary", account_id);
            
            let total_subnets = service_data.get("total_subnets").and_then(|v| v.as_u64()).unwrap_or(0);
            let internet_gateways = service_data.get("internet_gateways_count").and_then(|v| v.as_u64()).unwrap_or(0);
            let elastic_ips_total = service_data.get("elastic_ips_total").and_then(|v| v.as_u64()).unwrap_or(0);
            let elastic_ips_attached = service_data.get("elastic_ips_attached").and_then(|v| v.as_u64()).unwrap_or(0);
            
            let content = format!(
                "VPC Summary: {} VPCs with {} subnets - {} internet gateways - {} elastic IPs ({} attached) | Cloud Provider: aws",
                vpcs_count, total_subnets, internet_gateways, elastic_ips_total, elastic_ips_attached
            );
            
            let mut metadata = Self::create_base_metadata(
                account_id,
                "ec2",
                "vpc-summary",
                Some("global"),
            );
            
            metadata.insert("vpcs_count".to_string(), json!(vpcs_count));
            metadata.insert("total_subnets".to_string(), json!(total_subnets));
            metadata.insert("internet_gateways_count".to_string(), json!(internet_gateways));
            metadata.insert("elastic_ips_total".to_string(), json!(elastic_ips_total));
            metadata.insert("elastic_ips_attached".to_string(), json!(elastic_ips_attached));
            
            let mut doc = Document::new(doc_id, content);
            doc.metadata = metadata;
            return Ok(Some(doc));
        }
        
        Ok(None)
    }
}

#[async_trait]
impl AwsServiceParser for Ec2Parser {
    fn service_name(&self) -> &str {
        "ec2"
    }
    
    fn can_parse(&self, service_data: &Value) -> bool {
        // Check if this looks like EC2 data
        service_data.is_object() && (
            service_data.get("instances").is_some() ||
            service_data.get("volumes_count").is_some() ||
            service_data.get("vpcs_count").is_some()
        )
    }
    
    async fn parse(&self, account_id: &str, service_data: &Value) -> Result<Vec<Document>> {
        debug!("🔍 EC2 parser processing data for account: {}", account_id);
        
        let mut documents = Vec::new();
        
        // Parse EC2 instances
        if let Some(instances) = service_data.get("instances").and_then(|v| v.as_array()) {
            if !instances.is_empty() {
                let mut instance_docs = self.parse_ec2_instances(account_id, instances)?;
                documents.append(&mut instance_docs);
                debug!("✅ Parsed {} EC2 instances", instances.len());
            }
        }
        
        // Parse volume summary
        if let Some(volume_doc) = self.parse_volume_summary(account_id, service_data)? {
            documents.push(volume_doc);
            debug!("✅ Parsed EBS volumes summary");
        }
        
        // Parse VPC summary
        if let Some(vpc_doc) = self.parse_vpc_summary(account_id, service_data)? {
            documents.push(vpc_doc);
            debug!("✅ Parsed VPC summary");
        }
        
        if documents.is_empty() {
            warn!("🟡 EC2 parser found no parseable data");
        } else {
            debug!("🎉 EC2 parser generated {} documents", documents.len());
        }
        
        Ok(documents)
    }
    
    fn get_data_schema(&self) -> Option<Value> {
        Some(json!({
            "type": "object",
            "properties": {
                "instances": {
                    "type": "array",
                    "items": {
                        "type": "object",
                        "required": ["instance_id", "instance_type", "state", "region"],
                        "properties": {
                            "instance_id": {"type": "string"},
                            "instance_type": {"type": "string"},
                            "state": {"type": "string"},
                            "region": {"type": "string"},
                            "launch_time": {"type": "string"},
                            "name": {"type": "string"},
                            "availability_zone": {"type": "string"},
                            "security_groups": {"type": "array"}
                        }
                    }
                },
                "volumes_count": {"type": "number"},
                "total_size_gb": {"type": "number"},
                "attached_volumes": {"type": "number"},
                "unattached_volumes": {"type": "number"},
                "vpcs_count": {"type": "number"},
                "total_subnets": {"type": "number"},
                "internet_gateways_count": {"type": "number"},
                "elastic_ips_total": {"type": "number"},
                "elastic_ips_attached": {"type": "number"}
            }
        }))
    }
}