use anyhow::Result;
use clap::Parser;
use std::sync::Arc;
use tracing::{info, warn};
use awsim_core::{AppState, PersistenceManager, RequestContext};
mod admin;
mod integrations;
mod proxy;
#[derive(Parser)]
#[command(
name = "awsim",
about = "AWSim — fully offline, free AWS development environment"
)]
struct Cli {
#[arg(short, long, default_value = "4566", env = "AWSIM_PORT")]
port: u16,
#[arg(short, long, default_value = "us-east-1", env = "AWSIM_REGION")]
region: String,
#[arg(long, default_value = "000000000000", env = "AWSIM_ACCOUNT_ID")]
account_id: String,
#[arg(long, env = "AWSIM_DATA_DIR")]
data_dir: Option<String>,
#[arg(short = 'v', long, default_value = "info", env = "AWSIM_LOG_LEVEL")]
log_level: String,
}
#[tokio::main]
async fn main() -> Result<()> {
let cli = Cli::parse();
tracing_subscriber::fmt()
.with_env_filter(&cli.log_level)
.init();
let mut state = AppState::new(cli.region.clone(), cli.account_id.clone());
let (
apigw_service,
cognito_state,
iam_store,
s3_store,
kms_store,
sqs_store,
secrets_store,
lambda_store,
organizations_store,
) = register_services(&mut state, &cli.account_id, &cli.region);
if let Some(authz) = Arc::get_mut(&mut state.authz) {
authz.principal_lookup = Arc::new(awsim_iam::authz::IamPrincipalLookup::new(iam_store));
authz.resource_policy_lookups.insert(
"s3".to_string(),
Arc::new(awsim_s3::S3ResourcePolicyLookup::new(s3_store)),
);
authz.resource_policy_lookups.insert(
"kms".to_string(),
Arc::new(awsim_kms::KmsResourcePolicyLookup::new(kms_store)),
);
authz.resource_policy_lookups.insert(
"sqs".to_string(),
Arc::new(awsim_sqs::SqsResourcePolicyLookup::new(sqs_store)),
);
authz.resource_policy_lookups.insert(
"secretsmanager".to_string(),
Arc::new(awsim_secretsmanager::SecretsManagerResourcePolicyLookup::new(secrets_store)),
);
authz.resource_policy_lookups.insert(
"lambda".to_string(),
Arc::new(awsim_lambda::LambdaResourcePolicyLookup::new(lambda_store)),
);
authz.scp_lookup = Some(Arc::new(awsim_organizations::OrganizationsScpLookup::new(
organizations_store,
&cli.account_id,
)));
}
if let Some(ref data_dir) = cli.data_dir {
let pm = PersistenceManager::new(data_dir);
info!(data_dir = %data_dir, "Persistence enabled — restoring snapshots");
pm.restore_all(&state.services);
let services_for_shutdown = Arc::clone(&state.services);
let pm_shutdown = Arc::new(PersistenceManager::new(data_dir));
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigint =
signal(SignalKind::interrupt()).expect("failed to install SIGINT handler");
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
tokio::select! {
_ = sigint.recv() => {}
_ = sigterm.recv() => {}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.ok();
}
info!("Shutdown signal received — saving snapshots...");
pm_shutdown.save_all(&services_for_shutdown);
info!("Snapshots saved. Exiting.");
std::process::exit(0);
});
let services_for_autosave = Arc::clone(&state.services);
let pm_autosave = Arc::new(PersistenceManager::new(data_dir));
tokio::spawn(async move {
let interval = std::time::Duration::from_secs(30);
loop {
tokio::time::sleep(interval).await;
pm_autosave.save_all(&services_for_autosave);
}
});
}
let service_count = state.services.len();
spawn_event_router(&state);
let poll_services = Arc::clone(&state.services);
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
integrations::poll_sqs_event_sources(&poll_services).await;
}
});
let kinesis_poll_services = Arc::clone(&state.services);
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
integrations::poll_kinesis_event_sources(&kinesis_poll_services).await;
}
});
let lambda_arc = state.services.get("lambda").cloned();
let proxy_state = proxy::ProxyState {
apigw: apigw_service,
lambda: lambda_arc,
default_account_id: cli.account_id.clone(),
default_region: cli.region.clone(),
};
let proxy_router: axum::Router<()> = axum::Router::new()
.route(
"/restapis/{api_id}/{stage}/{*path}",
axum::routing::any(proxy::handle_proxy),
)
.with_state(proxy_state);
let cognito_oauth_state = Arc::new(awsim_cognito::CognitoOAuthState {
cognito: cognito_state,
default_account_id: cli.account_id.clone(),
default_region: cli.region.clone(),
auth_codes: Arc::new(dashmap::DashMap::new()),
revoked_refresh_tokens: Arc::new(dashmap::DashMap::new()),
port: cli.port,
});
let cognito_oauth_router = awsim_cognito::oauth::router(cognito_oauth_state);
let main_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/health", axum::routing::get(admin::health))
.route("/_awsim/services", axum::routing::get(admin::list_services))
.route("/_awsim/config", axum::routing::get(admin::config))
.route("/_awsim/stats", axum::routing::get(admin::stats))
.fallback(awsim_core::gateway::handle_request)
.with_state(state);
let opensearch_nested: axum::Router<()> = axum::Router::new().nest(
"/opensearch",
awsim_opensearch::router(Arc::new(awsim_opensearch::state::OpenSearchState::default())),
);
let app = cognito_oauth_router
.merge(main_router)
.merge(proxy_router)
.merge(opensearch_nested)
.layer(axum::extract::DefaultBodyLimit::max(100 * 1024 * 1024)) .layer(tower_http::cors::CorsLayer::permissive());
let addr = std::net::SocketAddr::from(([0, 0, 0, 0], cli.port));
let listener = tokio::net::TcpListener::bind(addr).await?;
println!();
println!(" AWSim v{}", env!("CARGO_PKG_VERSION"));
println!(" Fully Offline AWS Dev Environment");
println!();
println!(" Endpoint: http://localhost:{}", cli.port);
println!(" Region: {}", cli.region);
println!(" Account: {}", cli.account_id);
println!(" Services: {} registered", service_count);
if let Some(ref data_dir) = cli.data_dir {
println!(" Persist: {}", data_dir);
}
println!();
info!(
port = cli.port,
region = %cli.region,
account_id = %cli.account_id,
services = service_count,
"AWSim started"
);
axum::serve(listener, app).await?;
Ok(())
}
fn spawn_event_router(state: &AppState) {
use std::sync::Arc;
let mut rx = state.event_bus.subscribe();
let services = Arc::clone(&state.services);
let default_region = state.default_region.clone();
let default_account_id = state.default_account_id.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let protocol = event.detail["protocol"].as_str().unwrap_or("").to_string();
let endpoint = event.detail["endpoint"].as_str().unwrap_or("").to_string();
let message = event.detail["message"].as_str().unwrap_or("").to_string();
let message_id = event.detail["message_id"]
.as_str()
.unwrap_or("")
.to_string();
let topic_arn = event.detail["topic_arn"].as_str().unwrap_or("").to_string();
match event.event_type.as_str() {
"sns:Publish" if protocol == "sqs" => {
let queue_url =
arn_to_sqs_url(&endpoint, &default_region, &default_account_id);
if let Some(sqs_handler) = services.get("sqs") {
let ctx = RequestContext {
account_id: event.account_id.clone(),
region: event.region.clone(),
service: "sqs".to_string(),
access_key: None,
request_id: uuid::Uuid::new_v4().to_string(),
method: "POST".to_string(),
uri: "/".to_string(),
event_bus: None,
};
let body = serde_json::json!({
"Type": "Notification",
"MessageId": message_id,
"TopicArn": topic_arn,
"Message": message,
})
.to_string();
let input = serde_json::json!({
"QueueUrl": queue_url,
"MessageBody": body,
});
match sqs_handler.handle("SendMessage", input, &ctx).await {
Ok(_) => {
info!(
topic = %topic_arn,
queue = %endpoint,
"SNS→SQS fan-out delivered"
);
}
Err(e) => {
warn!(
topic = %topic_arn,
queue = %endpoint,
error = %e.message,
"SNS→SQS fan-out delivery failed"
);
}
}
}
}
"sns:Publish" if protocol == "lambda" => {
info!(
topic = %topic_arn,
function = %endpoint,
"SNS→Lambda fan-out: not yet implemented"
);
}
"cloudformation:CreateResource" => {
integrations::handle_cf_create_resource(&services, &event).await;
}
"cloudformation:DeleteResource" => {
integrations::handle_cf_delete_resource(&services, &event).await;
}
"dynamodb:StreamRecord" => {
integrations::handle_dynamodb_stream(&services, &event).await;
}
"eventbridge:TargetInvocation" => {
integrations::handle_eventbridge_target(&services, &event).await;
}
"cognito:LambdaTrigger" => {
integrations::handle_cognito_trigger(&services, &event).await;
}
t if t.starts_with("s3:ObjectCreated:")
|| t.starts_with("s3:ObjectRemoved:") =>
{
integrations::handle_s3_event(&services, &event).await;
}
_ => {
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(
skipped = n,
"Event bus receiver lagged; some events were dropped"
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
});
}
fn arn_to_sqs_url(arn: &str, default_region: &str, default_account: &str) -> String {
let parts: Vec<&str> = arn.splitn(6, ':').collect();
if parts.len() == 6 {
let region = parts[3];
let account = parts[4];
let queue = parts[5];
format!("http://sqs.{region}.localhost:4566/{account}/{queue}")
} else {
let queue = arn.rsplit(':').next().unwrap_or(arn);
format!("http://sqs.{default_region}.localhost:4566/{default_account}/{queue}")
}
}
type RegisteredServices = (
Arc<awsim_apigateway::ApiGatewayService>,
Arc<awsim_cognito::CognitoState>,
awsim_core::AccountRegionStore<awsim_iam::state::IamState>,
awsim_core::AccountRegionStore<awsim_s3::state::S3State>,
awsim_core::AccountRegionStore<awsim_kms::state::KmsState>,
awsim_core::AccountRegionStore<awsim_sqs::state::SqsState>,
awsim_core::AccountRegionStore<awsim_secretsmanager::state::SecretsState>,
awsim_core::AccountRegionStore<awsim_lambda::state::LambdaState>,
awsim_core::AccountRegionStore<awsim_organizations::state::OrganizationsState>,
);
fn register_services(
state: &mut AppState,
default_account_id: &str,
default_region: &str,
) -> RegisteredServices {
use std::sync::Arc;
let iam = Arc::new(awsim_iam::IamService::new());
let iam_store = iam.store();
state.register(iam, vec![]);
let sts = Arc::new(awsim_sts::StsService::new());
state.register(sts, vec![]);
let sns = Arc::new(awsim_sns::SnsService::new());
state.register(sns, vec![]);
let sqs = Arc::new(awsim_sqs::SqsService::new());
let sqs_store = sqs.store();
state.register(sqs, vec![]);
let dynamodb = Arc::new(awsim_dynamodb::DynamoDbService::new());
state.register(dynamodb, vec![]);
let s3 = awsim_s3::S3Service::new();
let s3_store = s3.store();
let s3_routes = {
use awsim_core::ServiceHandler;
s3.routes()
};
state.register(Arc::new(s3), s3_routes);
let lambda = awsim_lambda::LambdaService::new();
let lambda_store = lambda.store();
let lambda_routes = {
use awsim_core::ServiceHandler;
lambda.routes()
};
state.register(Arc::new(lambda), lambda_routes);
let logs = Arc::new(awsim_cloudwatch_logs::CloudWatchLogsService::new());
state.register(logs, vec![]);
let eventbridge = Arc::new(awsim_eventbridge::EventBridgeService::new());
state.register(eventbridge, vec![]);
let kms = Arc::new(awsim_kms::KmsService::new());
let kms_store = kms.store();
state.register(kms, vec![]);
let secretsmanager = Arc::new(awsim_secretsmanager::SecretsManagerService::new());
let secrets_store = secretsmanager.store();
state.register(secretsmanager, vec![]);
let ssm = Arc::new(awsim_ssm::SsmService::new());
state.register(ssm, vec![]);
let stepfunctions = Arc::new(awsim_stepfunctions::StepFunctionsService::new());
state.register(stepfunctions, vec![]);
let kinesis = Arc::new(awsim_kinesis::KinesisService::new());
state.register(kinesis, vec![]);
let ses = awsim_ses::SesService::new();
let ses_routes = {
use awsim_core::ServiceHandler;
ses.routes()
};
state.register(Arc::new(ses), ses_routes);
let cognito = Arc::new(awsim_cognito::CognitoService::new());
let cognito_arc_state = cognito.state_for(default_account_id, default_region);
state.register(cognito, vec![]);
let cognito_identity = Arc::new(awsim_cognito::CognitoIdentityService::new());
state.register(cognito_identity, vec![]);
let ecr = Arc::new(awsim_ecr::EcrService::new());
state.register(ecr, vec![]);
let ecs = Arc::new(awsim_ecs::EcsService::new());
state.register(ecs, vec![]);
let ec2 = Arc::new(awsim_ec2::Ec2Service::new());
state.register(ec2, vec![]);
let rds = Arc::new(awsim_rds::RdsService::new());
state.register(rds, vec![]);
let appsync = awsim_appsync::AppSyncService::new();
let appsync_routes = {
use awsim_core::ServiceHandler;
appsync.routes()
};
state.register(Arc::new(appsync), appsync_routes);
let bedrock = awsim_bedrock::BedrockService::new();
let bedrock_routes = {
use awsim_core::ServiceHandler;
bedrock.routes()
};
state.register(Arc::new(bedrock), bedrock_routes);
let bedrock_runtime = awsim_bedrock::BedrockRuntimeService::new();
let bedrock_runtime_routes = {
use awsim_core::ServiceHandler;
bedrock_runtime.routes()
};
state.register(Arc::new(bedrock_runtime), bedrock_runtime_routes);
let cloudformation = Arc::new(awsim_cloudformation::CloudFormationService::new());
state.register(cloudformation, vec![]);
let route53 = awsim_route53::Route53Service::new();
let route53_routes = {
use awsim_core::ServiceHandler;
route53.routes()
};
state.register(Arc::new(route53), route53_routes);
let cloudwatch_metrics = Arc::new(awsim_cloudwatch_metrics::CloudWatchMetricsService::new());
state.register(cloudwatch_metrics, vec![]);
let athena = Arc::new(awsim_athena::AthenaService::new());
state.register(athena, vec![]);
let glue = Arc::new(awsim_glue::GlueService::new());
state.register(glue, vec![]);
let elb = Arc::new(awsim_elb::ElbService::new());
state.register(elb, vec![]);
let cloudfront = awsim_cloudfront::CloudFrontService::new();
let cloudfront_routes = {
use awsim_core::ServiceHandler;
cloudfront.routes()
};
state.register(Arc::new(cloudfront), cloudfront_routes);
let acm = Arc::new(awsim_acm::AcmService::new());
state.register(acm, vec![]);
let waf = Arc::new(awsim_waf::WafService::new());
state.register(waf, vec![]);
let scheduler = awsim_scheduler::SchedulerService::new();
let scheduler_routes = {
use awsim_core::ServiceHandler;
scheduler.routes()
};
state.register(Arc::new(scheduler), scheduler_routes);
let comprehend = Arc::new(awsim_comprehend::ComprehendService::new());
state.register(comprehend, vec![]);
let kendra = Arc::new(awsim_kendra::KendraService::new());
state.register(kendra, vec![]);
let organizations = Arc::new(awsim_organizations::OrganizationsService::new());
let organizations_store = organizations.store();
state.register(organizations, vec![]);
let cloudtrail = Arc::new(awsim_cloudtrail::CloudTrailService::new());
state.register(cloudtrail, vec![]);
let eks = awsim_eks::EksService::new();
let eks_routes = {
use awsim_core::ServiceHandler;
eks.routes()
};
state.register(Arc::new(eks), eks_routes);
let firehose = Arc::new(awsim_firehose::FirehoseService::new());
state.register(firehose, vec![]);
let batch = awsim_batch::BatchService::new();
let batch_routes = {
use awsim_core::ServiceHandler;
batch.routes()
};
state.register(Arc::new(batch), batch_routes);
let sso_admin = Arc::new(awsim_sso_admin::SsoAdminService::new());
state.register(sso_admin, vec![]);
let datasync = Arc::new(awsim_datasync::DataSyncService::new());
state.register(datasync, vec![]);
let polly = awsim_polly::PollyService::new();
let polly_routes = {
use awsim_core::ServiceHandler;
polly.routes()
};
state.register(Arc::new(polly), polly_routes);
let apigateway = Arc::new(awsim_apigateway::ApiGatewayService::new());
let apigw_routes = {
use awsim_core::ServiceHandler;
apigateway.routes()
};
let apigw_clone = Arc::clone(&apigateway);
state.register(apigateway, apigw_routes);
(
apigw_clone,
cognito_arc_state,
iam_store,
s3_store,
kms_store,
sqs_store,
secrets_store,
lambda_store,
organizations_store,
)
}