rag-module 0.6.7

Enterprise RAG module with chat context storage, vector search, session management, and model downloading. Rust implementation with Node.js compatibility.
//! Test batch ingestion with persistent data storage
//! 
//! This example demonstrates the new batch ingestion functionality
//! and creates persistent qdrant-data files that you can inspect.

use rag_module::RagModule;
use serde_json::json;
use std::time::Instant;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Initialize with persistent directory 
    let data_dir = "./batch-test-data";
    std::fs::create_dir_all(data_dir)?;

    println!("🚀 Testing Batch AWS Estate Ingestion");
    println!("📁 Data will be stored in: {}/qdrant-data/", data_dir);

    // Initialize RAG module with persistent storage
    let rag = RagModule::new(data_dir).await?;
    rag.initialize().await?;

    let user_id = "batch_test_user";
    let collection_name = "aws_estate";

    // Create 1 realistic AWS documents for batch testing
    println!("\n📊 Creating 1 AWS estate documents for batch ingestion...");
    let mut aws_documents = Vec::new();

    // EC2 Instances (160 documents)
    for i in 0..160 {
        aws_documents.push(json!({
            "content": format!("EC2 instance i-{:016x} running {} in {} with {} configuration", 
                i, 
                if i % 2 == 0 { "Ubuntu 22.04" } else { "Amazon Linux 2" },
                if i % 3 == 0 { "us-west-2a" } else { "us-east-1b" },
                if i % 2 == 0 { "production" } else { "staging" }
            ),
            "resource_type": "ec2_instance",
            "instance_id": format!("i-{:016x}", i),
            "instance_type": if i % 3 == 0 { "t3.micro" } else { "t3.small" },
            "state": "running",
            "region": if i % 3 == 0 { "us-west-2" } else { "us-east-1" },
            "vpc_id": format!("vpc-{:08x}", i % 3),
            "security_groups": [format!("sg-web-{}", i)],
            "tags": {
                "Name": format!("web-server-{}", i),
                "Environment": if i % 2 == 0 { "production" } else { "staging" },
                "Application": "web-frontend",
                "Team": "platform-engineering"
            }
        }));
    }

    // S3 Buckets (128 documents)
    for i in 0..128 {
        aws_documents.push(json!({
            "content": format!("S3 bucket {} for {} storage containing {} objects with {} GB total size", 
                format!("company-data-{:03}", i),
                if i % 2 == 0 { "backup" } else { "application" },
                (i + 1) * 1000,
                (i + 1) * 50
            ),
            "resource_type": "s3_bucket", 
            "bucket_name": format!("company-data-{:03}", i),
            "region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
            "versioning_enabled": i % 3 == 0,
            "encryption": "AES256",
            "public_access_blocked": true,
            "object_count": (i + 1) * 1000,
            "size_gb": (i + 1) * 50,
            "storage_class": if i % 2 == 0 { "STANDARD" } else { "INTELLIGENT_TIERING" },
            "tags": {
                "Environment": if i % 2 == 0 { "production" } else { "staging" },
                "DataClassification": "internal",
                "BackupRequired": true,
                "CostCenter": format!("cc-{}", i % 3)
            }
        }));
    }

    // RDS Databases (112 documents)
    for i in 0..112 {
        aws_documents.push(json!({
            "content": format!("RDS {} database {} running version {} with {} storage and {} backup retention", 
                if i % 2 == 0 { "MySQL" } else { "PostgreSQL" },
                format!("app-db-{:02}", i),
                if i % 2 == 0 { "8.0.35" } else { "15.4" },
                format!("{} GB", (i + 1) * 100),
                format!("{} days", if i % 2 == 0 { 7 } else { 30 })
            ),
            "resource_type": "rds_instance",
            "db_identifier": format!("app-db-{:02}", i),
            "engine": if i % 2 == 0 { "mysql" } else { "postgres" },
            "engine_version": if i % 2 == 0 { "8.0.35" } else { "15.4" },
            "instance_class": format!("db.{}", if i % 2 == 0 { "t3.micro" } else { "r6g.large" }),
            "allocated_storage": (i + 1) * 100,
            "storage_encrypted": true,
            "multi_az": i % 3 == 0,
            "backup_retention": if i % 2 == 0 { 7 } else { 30 },
            "region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
            "tags": {
                "Name": format!("app-database-{}", i),
                "Environment": if i % 2 == 0 { "production" } else { "staging" },
                "Application": "backend-services",
                "DatabaseType": if i % 2 == 0 { "mysql" } else { "postgres" }
            }
        }));
    }

    // Lambda Functions (112 documents)
    for i in 0..112 {
        aws_documents.push(json!({
            "content": format!("Lambda function {} using {} runtime for {} processing with {} timeout and {} memory", 
                format!("data-processor-{}", i),
                if i % 3 == 0 { "python3.11" } else if i % 3 == 1 { "nodejs20.x" } else { "java17" },
                if i % 2 == 0 { "batch data" } else { "real-time event" },
                if i % 2 == 0 { "5 minutes" } else { "30 seconds" },
                if i % 2 == 0 { "1024 MB" } else { "1 MB" }
            ),
            "resource_type": "lambda_function",
            "function_name": format!("data-processor-{}", i),
            "runtime": if i % 3 == 0 { "python3.11" } else if i % 3 == 1 { "nodejs20.x" } else { "java17" },
            "timeout": if i % 2 == 0 { 300 } else { 30 },
            "memory_size": if i % 2 == 0 { 1024 } else { 1 },
            "handler": if i % 3 == 0 { "lambda_function.lambda_handler" } else { "index.handler" },
            "region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
            "environment_variables": {
                "STAGE": if i % 2 == 0 { "prod" } else { "dev" },
                "LOG_LEVEL": "INFO",
                "REGION": if i % 2 == 0 { "us-west-2" } else { "us-east-1" }
            },
            "tags": {
                "Environment": if i % 2 == 0 { "production" } else { "staging" },
                "Team": "data-engineering",
                "Runtime": if i % 3 == 0 { "python" } else if i % 3 == 1 { "nodejs" } else { "java" }
            }
        }));
    }

    println!("✅ Created {} AWS documents", aws_documents.len());
    println!("   📦 {} EC2 instances", 160);
    println!("   🗄️  {} S3 buckets", 128);
    println!("   🗃️  {} RDS databases", 112);
    println!("{} Lambda functions", 112);

    // Test batch ingestion
    println!("\n🚀 Starting batch ingestion test...");
    let start_time = Instant::now();
    
    let result = rag.ingest_aws_estate_batch(aws_documents, user_id, collection_name).await?;
    
    let duration = start_time.elapsed();

    // Display results
    println!("\n✅ Batch Ingestion Complete!");
    println!("   📊 Total documents: {}", result.total_resources);
    println!("   ✅ Successfully processed: {}", result.parsed_resources);
    println!("   ❌ Failed: {}", result.failed_resources);
    println!("   ⚡ Time taken: {:?}", duration);
    println!("   📈 Throughput: {:.2} docs/sec", result.parsed_resources as f64 / duration.as_secs_f64());

    if !result.create_result.failed.is_empty() {
        println!("   🚨 Errors:");
        for error in &result.create_result.failed {
            println!("      - {}", error);
        }
    }

    // Test search functionality
    println!("\n🔍 Testing search functionality...");
    
    let search_options = rag_module::SearchOptions {
        limit: Some(3),
        score_threshold: Some(0.1),
        ..Default::default()
    };

    // Search for EC2 instances
    let ec2_results = rag.search(collection_name, "EC2 instance production", user_id, search_options.clone()).await?;
    println!("   🖥️  Found {} EC2-related results", ec2_results.len());

    // Search for databases
    let db_results = rag.search(collection_name, "database MySQL PostgreSQL", user_id, search_options.clone()).await?;
    println!("   🗄️  Found {} database-related results", db_results.len());

    // Search for Lambda functions
    let lambda_results = rag.search(collection_name, "Lambda function python nodejs", user_id, search_options).await?;
    println!("   ⚡ Found {} Lambda-related results", lambda_results.len());

    // Show where data is stored
    println!("\n📁 Data Storage Locations:");
    println!("   📂 Base directory: {}", data_dir);
    println!("   🗂️  Qdrant data: {}/qdrant-data/", data_dir);
    println!("   👤 User data: {}/qdrant-data/{}/", data_dir, user_id);
    println!("   📄 Documents: {}/qdrant-data/{}/{}-documents.json", data_dir, user_id, collection_name);
    println!("   🔢 Vectors: {}/qdrant-data/{}/{}-vectors.bin", data_dir, user_id, collection_name);
    println!("   📊 Index: {}/qdrant-data/{}/{}-vector-index.json", data_dir, user_id, collection_name);

    // Check if files actually exist
    let user_dir = std::path::Path::new(data_dir)
        .join("qdrant-data")
        .join(user_id);
    
    if user_dir.exists() {
        println!("\n✅ Persistent data files created:");
        if let Ok(entries) = std::fs::read_dir(&user_dir) {
            for entry in entries {
                if let Ok(entry) = entry {
                    let file_name = entry.file_name();
                    let metadata = entry.metadata()?;
                    println!("   📄 {} ({} bytes)", 
                        file_name.to_string_lossy(), 
                        metadata.len()
                    );
                }
            }
        }
    } else {
        println!("⚠️  User directory not found: {}", user_dir.display());
    }

    println!("\n🎉 Batch ingestion demo completed successfully!");
    println!("💡 You can now inspect the created files in: {}/qdrant-data/{}/", data_dir, user_id);
    
    Ok(())
}