use rag_module::RagModule;
use serde_json::json;
use std::time::Instant;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let data_dir = "./batch-test-data";
std::fs::create_dir_all(data_dir)?;
println!("🚀 Testing Batch AWS Estate Ingestion");
println!("📁 Data will be stored in: {}/qdrant-data/", data_dir);
let rag = RagModule::new(data_dir).await?;
rag.initialize().await?;
let user_id = "batch_test_user";
let collection_name = "aws_estate";
println!("\n📊 Creating 1 AWS estate documents for batch ingestion...");
let mut aws_documents = Vec::new();
for i in 0..160 {
aws_documents.push(json!({
"content": format!("EC2 instance i-{:016x} running {} in {} with {} configuration",
i,
if i % 2 == 0 { "Ubuntu 22.04" } else { "Amazon Linux 2" },
if i % 3 == 0 { "us-west-2a" } else { "us-east-1b" },
if i % 2 == 0 { "production" } else { "staging" }
),
"resource_type": "ec2_instance",
"instance_id": format!("i-{:016x}", i),
"instance_type": if i % 3 == 0 { "t3.micro" } else { "t3.small" },
"state": "running",
"region": if i % 3 == 0 { "us-west-2" } else { "us-east-1" },
"vpc_id": format!("vpc-{:08x}", i % 3),
"security_groups": [format!("sg-web-{}", i)],
"tags": {
"Name": format!("web-server-{}", i),
"Environment": if i % 2 == 0 { "production" } else { "staging" },
"Application": "web-frontend",
"Team": "platform-engineering"
}
}));
}
for i in 0..128 {
aws_documents.push(json!({
"content": format!("S3 bucket {} for {} storage containing {} objects with {} GB total size",
format!("company-data-{:03}", i),
if i % 2 == 0 { "backup" } else { "application" },
(i + 1) * 1000,
(i + 1) * 50
),
"resource_type": "s3_bucket",
"bucket_name": format!("company-data-{:03}", i),
"region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
"versioning_enabled": i % 3 == 0,
"encryption": "AES256",
"public_access_blocked": true,
"object_count": (i + 1) * 1000,
"size_gb": (i + 1) * 50,
"storage_class": if i % 2 == 0 { "STANDARD" } else { "INTELLIGENT_TIERING" },
"tags": {
"Environment": if i % 2 == 0 { "production" } else { "staging" },
"DataClassification": "internal",
"BackupRequired": true,
"CostCenter": format!("cc-{}", i % 3)
}
}));
}
for i in 0..112 {
aws_documents.push(json!({
"content": format!("RDS {} database {} running version {} with {} storage and {} backup retention",
if i % 2 == 0 { "MySQL" } else { "PostgreSQL" },
format!("app-db-{:02}", i),
if i % 2 == 0 { "8.0.35" } else { "15.4" },
format!("{} GB", (i + 1) * 100),
format!("{} days", if i % 2 == 0 { 7 } else { 30 })
),
"resource_type": "rds_instance",
"db_identifier": format!("app-db-{:02}", i),
"engine": if i % 2 == 0 { "mysql" } else { "postgres" },
"engine_version": if i % 2 == 0 { "8.0.35" } else { "15.4" },
"instance_class": format!("db.{}", if i % 2 == 0 { "t3.micro" } else { "r6g.large" }),
"allocated_storage": (i + 1) * 100,
"storage_encrypted": true,
"multi_az": i % 3 == 0,
"backup_retention": if i % 2 == 0 { 7 } else { 30 },
"region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
"tags": {
"Name": format!("app-database-{}", i),
"Environment": if i % 2 == 0 { "production" } else { "staging" },
"Application": "backend-services",
"DatabaseType": if i % 2 == 0 { "mysql" } else { "postgres" }
}
}));
}
for i in 0..112 {
aws_documents.push(json!({
"content": format!("Lambda function {} using {} runtime for {} processing with {} timeout and {} memory",
format!("data-processor-{}", i),
if i % 3 == 0 { "python3.11" } else if i % 3 == 1 { "nodejs20.x" } else { "java17" },
if i % 2 == 0 { "batch data" } else { "real-time event" },
if i % 2 == 0 { "5 minutes" } else { "30 seconds" },
if i % 2 == 0 { "1024 MB" } else { "1 MB" }
),
"resource_type": "lambda_function",
"function_name": format!("data-processor-{}", i),
"runtime": if i % 3 == 0 { "python3.11" } else if i % 3 == 1 { "nodejs20.x" } else { "java17" },
"timeout": if i % 2 == 0 { 300 } else { 30 },
"memory_size": if i % 2 == 0 { 1024 } else { 1 },
"handler": if i % 3 == 0 { "lambda_function.lambda_handler" } else { "index.handler" },
"region": if i % 2 == 0 { "us-west-2" } else { "us-east-1" },
"environment_variables": {
"STAGE": if i % 2 == 0 { "prod" } else { "dev" },
"LOG_LEVEL": "INFO",
"REGION": if i % 2 == 0 { "us-west-2" } else { "us-east-1" }
},
"tags": {
"Environment": if i % 2 == 0 { "production" } else { "staging" },
"Team": "data-engineering",
"Runtime": if i % 3 == 0 { "python" } else if i % 3 == 1 { "nodejs" } else { "java" }
}
}));
}
println!("✅ Created {} AWS documents", aws_documents.len());
println!(" 📦 {} EC2 instances", 160);
println!(" 🗄️ {} S3 buckets", 128);
println!(" 🗃️ {} RDS databases", 112);
println!(" ⚡ {} Lambda functions", 112);
println!("\n🚀 Starting batch ingestion test...");
let start_time = Instant::now();
let result = rag.ingest_aws_estate_batch(aws_documents, user_id, collection_name).await?;
let duration = start_time.elapsed();
println!("\n✅ Batch Ingestion Complete!");
println!(" 📊 Total documents: {}", result.total_resources);
println!(" ✅ Successfully processed: {}", result.parsed_resources);
println!(" ❌ Failed: {}", result.failed_resources);
println!(" ⚡ Time taken: {:?}", duration);
println!(" 📈 Throughput: {:.2} docs/sec", result.parsed_resources as f64 / duration.as_secs_f64());
if !result.create_result.failed.is_empty() {
println!(" 🚨 Errors:");
for error in &result.create_result.failed {
println!(" - {}", error);
}
}
println!("\n🔍 Testing search functionality...");
let search_options = rag_module::SearchOptions {
limit: Some(3),
score_threshold: Some(0.1),
..Default::default()
};
let ec2_results = rag.search(collection_name, "EC2 instance production", user_id, search_options.clone()).await?;
println!(" 🖥️ Found {} EC2-related results", ec2_results.len());
let db_results = rag.search(collection_name, "database MySQL PostgreSQL", user_id, search_options.clone()).await?;
println!(" 🗄️ Found {} database-related results", db_results.len());
let lambda_results = rag.search(collection_name, "Lambda function python nodejs", user_id, search_options).await?;
println!(" ⚡ Found {} Lambda-related results", lambda_results.len());
println!("\n📁 Data Storage Locations:");
println!(" 📂 Base directory: {}", data_dir);
println!(" 🗂️ Qdrant data: {}/qdrant-data/", data_dir);
println!(" 👤 User data: {}/qdrant-data/{}/", data_dir, user_id);
println!(" 📄 Documents: {}/qdrant-data/{}/{}-documents.json", data_dir, user_id, collection_name);
println!(" 🔢 Vectors: {}/qdrant-data/{}/{}-vectors.bin", data_dir, user_id, collection_name);
println!(" 📊 Index: {}/qdrant-data/{}/{}-vector-index.json", data_dir, user_id, collection_name);
let user_dir = std::path::Path::new(data_dir)
.join("qdrant-data")
.join(user_id);
if user_dir.exists() {
println!("\n✅ Persistent data files created:");
if let Ok(entries) = std::fs::read_dir(&user_dir) {
for entry in entries {
if let Ok(entry) = entry {
let file_name = entry.file_name();
let metadata = entry.metadata()?;
println!(" 📄 {} ({} bytes)",
file_name.to_string_lossy(),
metadata.len()
);
}
}
}
} else {
println!("⚠️ User directory not found: {}", user_dir.display());
}
println!("\n🎉 Batch ingestion demo completed successfully!");
println!("💡 You can now inspect the created files in: {}/qdrant-data/{}/", data_dir, user_id);
Ok(())
}