use anyhow::{Context, Result};
use axum::error_handling::HandleErrorLayer;
use axum::http::StatusCode;
use axum::response::IntoResponse;
use clap::{Parser, Subcommand};
use std::sync::Arc;
use tower::BoxError;
use tower::ServiceBuilder;
use tower::limit::ConcurrencyLimitLayer;
use tower::load_shed::LoadShedLayer;
use tracing::{debug, error, info, warn};
#[cfg(all(not(target_env = "msvc"), not(target_env = "musl")))]
#[global_allocator]
static GLOBAL: tikv_jemallocator::Jemalloc = tikv_jemallocator::Jemalloc;
use awsim_core::{
AppState, BlobInventory, BodyStore, BodyStoreHandle, PersistenceManager, RequestContext,
};
mod admin;
mod bill_cli;
mod chaos_cli;
mod integrations;
mod named_snapshots;
mod operator_auth;
mod proxy;
mod runtime_config;
mod seed;
mod seed_cli;
mod snapshot_cli;
mod tls;
mod ui;
#[derive(Parser)]
#[command(
name = "awsim",
about = "AWSim — fully offline, free AWS development environment"
)]
struct Cli {
#[arg(short, long, default_value = "4566", env = "AWSIM_PORT")]
port: u16,
#[arg(long, env = "AWSIM_HTTPS_PORT")]
https_port: Option<u16>,
#[arg(long, env = "AWSIM_TLS_CERT", requires = "tls_key")]
tls_cert: Option<std::path::PathBuf>,
#[arg(long, env = "AWSIM_TLS_KEY", requires = "tls_cert")]
tls_key: Option<std::path::PathBuf>,
#[arg(long, env = "AWSIM_TLS_CACHE_DIR")]
tls_cache_dir: Option<std::path::PathBuf>,
#[arg(short, long, default_value = "us-east-1", env = "AWSIM_REGION")]
region: String,
#[arg(long, default_value = "000000000000", env = "AWSIM_ACCOUNT_ID")]
account_id: String,
#[arg(long, default_value = "aws", env = "AWSIM_PARTITION")]
partition: String,
#[arg(long, env = "AWSIM_DATA_DIR")]
data_dir: Option<String>,
#[arg(short = 'v', long, default_value = "info", env = "AWSIM_LOG_LEVEL")]
log_level: String,
#[arg(long, env = "AWSIM_NO_GC", default_value_t = false)]
no_gc: bool,
#[arg(long, env = "AWSIM_MAX_BLOB_BYTES")]
max_blob_bytes: Option<u64>,
#[arg(long, env = "AWSIM_GC_INTERVAL_SECS")]
gc_interval_secs: Option<u64>,
#[arg(long, env = "AWSIM_DDB_WAL_CHECKPOINT_SECS")]
ddb_wal_checkpoint_secs: Option<u64>,
#[arg(long, env = "AWSIM_DDB_TTL_GRACE_SECS", default_value_t = 0)]
ddb_ttl_grace_secs: u64,
#[arg(long, env = "AWSIM_MAX_CONCURRENT_REQUESTS", default_value_t = 5_000)]
max_concurrent_requests: usize,
#[arg(long, env = "AWSIM_MAX_BLOCKING_THREADS", default_value_t = 32)]
max_blocking_threads: usize,
#[arg(long, env = "AWSIM_MAX_BODY_BYTES", default_value_t = 100 * 1024 * 1024)]
max_body_bytes: usize,
#[arg(
long,
env = "AWSIM_MAX_S3_UPLOAD_BYTES",
default_value_t = 5 * 1024 * 1024 * 1024
)]
max_s3_upload_bytes: usize,
#[arg(long, env = "AWSIM_SES_RETENTION_HOURS", default_value_t = 720)]
ses_retention_hours: u64,
#[arg(long, env = "AWSIM_BEDROCK_BACKEND")]
bedrock_backend: Option<String>,
#[arg(long, env = "AWSIM_BEDROCK_API_KEY")]
bedrock_api_key: Option<String>,
#[arg(long, env = "AWSIM_BEDROCK_MODEL_MAP")]
bedrock_model_map: Option<std::path::PathBuf>,
#[arg(long, env = "AWSIM_BEDROCK_CONFIG")]
bedrock_config: Option<std::path::PathBuf>,
#[arg(long, env = "AWSIM_ENFORCE_IAM")]
enforce_iam: Option<bool>,
#[arg(long, env = "AWSIM_ADMIN_ACCESS_KEY", default_value = "awsim-admin")]
admin_access_key: String,
#[command(subcommand)]
command: Option<Command>,
}
#[derive(Subcommand)]
enum Command {
Bill {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
#[arg(long)]
json: bool,
},
Chaos {
#[command(subcommand)]
command: ChaosCommand,
},
Snapshot {
#[command(subcommand)]
command: SnapshotCommand,
},
Vacuum {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
},
Seed {
#[arg(long)]
file: std::path::PathBuf,
#[arg(long, env = "AWSIM_ENDPOINT")]
endpoint: Option<String>,
},
}
#[derive(Subcommand)]
enum SnapshotCommand {
List {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
#[arg(long)]
json: bool,
},
Save {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
name: String,
},
Load {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
name: String,
},
Delete {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
name: String,
},
}
#[derive(Subcommand)]
enum ChaosCommand {
List {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
#[arg(long)]
json: bool,
},
Add {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
#[arg(long, default_value = "*")]
service: String,
#[arg(long, default_value = "*")]
operation: String,
#[arg(long, default_value_t = 1.0)]
probability: f64,
#[arg(long)]
error: Option<String>,
#[arg(long)]
latency: Option<String>,
#[arg(long)]
label: Option<String>,
#[arg(long)]
ttl_secs: Option<u64>,
#[arg(long)]
start_in_secs: Option<u64>,
#[arg(long)]
flap: Option<String>,
},
Remove {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
id: String,
},
Clear {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
},
Stats {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
},
Preset {
#[command(subcommand)]
command: ChaosPresetCommand,
},
}
#[derive(Subcommand)]
enum ChaosPresetCommand {
List {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
#[arg(long)]
json: bool,
},
Apply {
#[arg(long, default_value = "http://localhost:4566", env = "AWSIM_ENDPOINT")]
endpoint: String,
name: String,
},
}
fn main() -> Result<()> {
let max_blocking = Cli::try_parse()
.map(|c| c.max_blocking_threads)
.unwrap_or(32);
let runtime = tokio::runtime::Builder::new_multi_thread()
.enable_all()
.max_blocking_threads(max_blocking)
.build()?;
runtime.block_on(async_main())
}
async fn async_main() -> Result<()> {
let cli = Cli::parse();
if let Some(cmd) = cli.command {
match cmd {
Command::Bill { endpoint, json } => {
return bill_cli::run(&endpoint, json).await;
}
Command::Chaos { command } => {
return chaos_cli::run(command).await;
}
Command::Snapshot { command } => {
return snapshot_cli::run(command).await;
}
Command::Vacuum { endpoint } => {
return run_vacuum(&endpoint).await;
}
Command::Seed { file, endpoint } => {
return seed_cli::run(&file, endpoint.as_deref()).await;
}
}
}
let initial_filter =
tracing_subscriber::EnvFilter::try_new(&cli.log_level).unwrap_or_else(|e| {
eprintln!(
"Invalid log filter {:?}: {e} — falling back to 'info'",
cli.log_level
);
tracing_subscriber::EnvFilter::new("info")
});
let (filter_layer, log_reload) = tracing_subscriber::reload::Layer::new(initial_filter);
use tracing_subscriber::layer::SubscriberExt;
use tracing_subscriber::util::SubscriberInitExt;
tracing_subscriber::registry()
.with(filter_layer)
.with(tracing_subscriber::fmt::layer())
.init();
raise_nofile_limit();
let mut state = AppState::with_partition(
cli.region.clone(),
cli.account_id.clone(),
cli.partition.clone(),
);
let runtime_config_store = build_runtime_config_store(&cli)?;
info!(
persistent = runtime_config_store.is_persistent(),
path = ?runtime_config_store.config_path(),
"Runtime config store initialised"
);
let bedrock_health = awsim_bedrock::HealthRegistry::new();
let bedrock_metrics = awsim_bedrock::MetricsRegistry::new();
let bedrock_recent = awsim_bedrock::RecentInvocations::new();
let initial_bedrock = build_bedrock_backend_from_config(
&runtime_config_store.current(),
&bedrock_health,
&bedrock_metrics,
&bedrock_recent,
)?;
let bedrock_swap = awsim_bedrock::backends_swap(initial_bedrock);
{
let swap = Arc::clone(&bedrock_swap);
let health_for_reload = bedrock_health.clone();
let metrics_for_reload = bedrock_metrics.clone();
let recent_for_reload = bedrock_recent.clone();
runtime_config_store.on_change(Box::new(
move |cfg| match build_bedrock_backend_from_config(
cfg,
&health_for_reload,
&metrics_for_reload,
&recent_for_reload,
) {
Ok(next) => {
swap.store(Arc::new(next));
info!("Bedrock backends hot-reloaded");
}
Err(e) => {
warn!(error = %e, "Bedrock hot-reload failed; keeping previous registry")
}
},
));
}
{
let swap = Arc::clone(&bedrock_swap);
let registry = bedrock_health.clone();
tokio::spawn(async move {
awsim_bedrock::run_poller(
swap,
registry,
std::time::Duration::from_secs(30),
std::time::Duration::from_secs(5),
)
.await;
});
}
let (
apigw_service,
apigw_v1_service,
cognito_state,
iam_service,
iam_store,
s3_store,
kms_store,
sqs_store,
secrets_store,
lambda_store,
organizations_store,
ecr_service,
s3_service,
lambda_service,
sqs_service,
logs_service,
pipes_store,
ec2_service,
rds_service,
mq_service,
memorydb_service,
dynamodb_service,
cw_metrics_service,
kinesis_service,
ses_service,
sts_sessions,
sns_store,
ecr_store,
) = register_services(
&mut state,
&cli.account_id,
&cli.region,
cli.data_dir.as_deref(),
cli.port,
cli.max_blob_bytes,
cli.ddb_wal_checkpoint_secs,
cli.ddb_ttl_grace_secs,
Arc::clone(&bedrock_swap),
);
let mut body_stores: Vec<BodyStoreHandle> = Vec::new();
if let Some(bs) = s3_service.body_store() {
body_stores.push(BodyStoreHandle {
service_name: "s3".to_string(),
groups: awsim_s3::S3Service::GROUPS
.iter()
.map(|s| (*s).to_string())
.collect(),
body_store: Arc::clone(bs),
});
}
if let Some(bs) = lambda_service.body_store() {
body_stores.push(BodyStoreHandle {
service_name: "lambda".to_string(),
groups: awsim_lambda::LambdaService::GROUPS
.iter()
.map(|s| (*s).to_string())
.collect(),
body_store: Arc::clone(bs),
});
}
if let Some(bs) = sqs_service.body_store() {
body_stores.push(BodyStoreHandle {
service_name: "sqs".to_string(),
groups: awsim_sqs::SqsService::GROUPS
.iter()
.map(|s| (*s).to_string())
.collect(),
body_store: Arc::clone(bs),
});
}
if let Some(bs) = ecr_service.body_store() {
body_stores.push(BodyStoreHandle {
service_name: "ecr".to_string(),
groups: awsim_ecr::EcrService::GROUPS
.iter()
.map(|s| (*s).to_string())
.collect(),
body_store: Arc::clone(bs),
});
}
state.body_stores = Arc::new(body_stores);
if let Some(ref dir) = cli.data_dir {
state.data_dir = Some(Arc::new(std::path::PathBuf::from(dir)));
}
let billing_meter = Arc::new(awsim_billing::BillingMeter::new());
awsim_billing::spawn_meter((*billing_meter).clone(), &state.events);
if let Some(authz) = Arc::get_mut(&mut state.authz) {
authz.admin_access_key =
(!cli.admin_access_key.is_empty()).then(|| cli.admin_access_key.clone());
let iam_lookup: Arc<dyn awsim_core::PrincipalLookup> =
Arc::new(awsim_iam::authz::IamPrincipalLookup::new(iam_store));
authz.principal_lookup = Arc::new(awsim_sts::StsAwarePrincipalLookup::new(
Arc::clone(&sts_sessions),
iam_lookup,
));
authz.resource_policy_lookups.insert(
"s3".to_string(),
Arc::new(awsim_s3::S3ResourcePolicyLookup::new(s3_store)),
);
authz.resource_policy_lookups.insert(
"kms".to_string(),
Arc::new(awsim_kms::KmsResourcePolicyLookup::new(kms_store.clone())),
);
authz.grant_lookups.insert(
"kms".to_string(),
Arc::new(awsim_kms::KmsGrantLookup::new(kms_store)),
);
authz.resource_policy_lookups.insert(
"sqs".to_string(),
Arc::new(awsim_sqs::SqsResourcePolicyLookup::new(sqs_store.clone())),
);
authz.resource_policy_lookups.insert(
"sns".to_string(),
Arc::new(awsim_sns::SnsResourcePolicyLookup::new(sns_store.clone())),
);
authz.resource_policy_lookups.insert(
"secretsmanager".to_string(),
Arc::new(
awsim_secretsmanager::SecretsManagerResourcePolicyLookup::new(
secrets_store.clone(),
),
),
);
authz.resource_policy_lookups.insert(
"lambda".to_string(),
Arc::new(awsim_lambda::LambdaResourcePolicyLookup::new(
lambda_store.clone(),
)),
);
authz.resource_policy_lookups.insert(
"ecr".to_string(),
Arc::new(awsim_ecr::EcrResourcePolicyLookup::new(ecr_store.clone())),
);
authz.scp_lookup = Some(Arc::new(awsim_organizations::OrganizationsScpLookup::new(
organizations_store,
&cli.account_id,
)));
}
iam_service.set_authz(Arc::clone(&state.authz));
{
let authz = Arc::clone(&state.authz);
if let Some(forced) = cli.enforce_iam {
let mut cfg = runtime_config_store.current().as_ref().clone();
if cfg.iam.enforce != forced {
cfg.iam.enforce = forced;
if let Err(e) = runtime_config_store.apply(cfg) {
warn!(error = %e, "Failed to persist forced IAM enforcement flag");
}
}
info!(enforce = forced, "IAM enforcement set from CLI/env");
}
authz.set_enabled(runtime_config_store.current().iam.enforce);
let authz = Arc::clone(&authz);
runtime_config_store.on_change(Box::new(move |cfg| {
authz.set_enabled(cfg.iam.enforce);
info!(enforce = cfg.iam.enforce, "IAM enforcement hot-reloaded");
}));
}
{
let persisted = runtime_config_store.current().logging.level.clone();
if persisted != cli.log_level
&& let Ok(filter) = tracing_subscriber::EnvFilter::try_new(&persisted)
{
let _ = log_reload.modify(|f| *f = filter);
}
runtime_config_store.on_change(Box::new(move |cfg| {
match tracing_subscriber::EnvFilter::try_new(&cfg.logging.level) {
Ok(filter) => {
if log_reload.modify(|f| *f = filter).is_ok() {
info!(level = %cfg.logging.level, "Log filter hot-reloaded");
}
}
Err(e) => warn!(error = %e, "Log filter rejected at reload time"),
}
}));
}
if cli.data_dir.is_none() {
let dynamodb_for_cleanup = Arc::clone(&dynamodb_service);
let logs_for_cleanup = Arc::clone(&logs_service);
let cw_metrics_for_cleanup = Arc::clone(&cw_metrics_service);
let kinesis_for_cleanup = Arc::clone(&kinesis_service);
let ses_for_cleanup = Arc::clone(&ses_service);
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigint = signal(SignalKind::interrupt()).expect("install SIGINT");
let mut sigterm = signal(SignalKind::terminate()).expect("install SIGTERM");
tokio::select! {
_ = sigint.recv() => {}
_ = sigterm.recv() => {}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.ok();
}
cleanup_tempdir("DynamoDB", dynamodb_for_cleanup.tempdir_path());
cleanup_tempdir("CloudWatch Logs", logs_for_cleanup.tempdir_path());
cleanup_tempdir("CloudWatch Metrics", cw_metrics_for_cleanup.tempdir_path());
cleanup_tempdir("Kinesis", kinesis_for_cleanup.tempdir_path());
cleanup_tempdir("SES", ses_for_cleanup.tempdir_path());
info!("Exiting.");
std::process::exit(0);
});
}
{
let sessions = Arc::clone(&sts_sessions);
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(300));
tick.tick().await; loop {
tick.tick().await;
sessions.purge_expired();
}
});
}
{
let ses_for_sweep = Arc::clone(&ses_service);
let cfg_store = Arc::clone(&runtime_config_store);
tokio::spawn(async move {
let mut tick = tokio::time::interval(std::time::Duration::from_secs(3600));
tick.tick().await; loop {
tick.tick().await;
let retention_hours = cfg_store.current().ses.retention_hours;
if retention_hours == 0 {
continue;
}
if let Some(store) = ses_for_sweep.sqlite_store_handle() {
let cutoff = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.map(|d| d.as_secs() as i64)
.unwrap_or(0)
- (retention_hours as i64) * 3600;
let store = Arc::clone(&store);
match tokio::task::spawn_blocking(move || store.trim_older_than(cutoff)).await {
Ok(Ok(removed)) if removed > 0 => {
info!(removed, retention_hours, "SES retention sweep")
}
Ok(Ok(_)) => {}
Ok(Err(e)) => warn!(error = %e.message, "SES retention sweep failed"),
Err(e) => warn!(error = %e, "SES retention sweep join error"),
}
}
}
});
}
if let Some(ref data_dir) = cli.data_dir {
let pm = PersistenceManager::new(data_dir);
info!(data_dir = %data_dir, "Persistence enabled — restoring snapshots");
pm.restore_all(&state.services);
if let Some(bytes) = pm.load_snapshot("billing")
&& let Err(e) = billing_meter.store.restore_from_bytes(&bytes)
{
warn!(error = %e, "Failed to restore billing snapshot");
}
if let Some(bytes) = pm.load_snapshot("chaos")
&& let Err(e) = state.chaos.restore_from_bytes(&bytes)
{
warn!(error = %e, "Failed to restore chaos snapshot");
}
if !cli.no_gc {
run_gc(
s3_service.as_ref(),
lambda_service.as_ref(),
sqs_service.as_ref(),
ecr_service.as_ref(),
);
}
if let Some(secs) = cli.gc_interval_secs {
let s3_gc = Arc::clone(&s3_service);
let lambda_gc = Arc::clone(&lambda_service);
let sqs_gc = Arc::clone(&sqs_service);
let ecr_gc = Arc::clone(&ecr_service);
let interval = std::time::Duration::from_secs(secs);
info!(interval_secs = secs, "Periodic BodyStore GC enabled");
tokio::spawn(async move {
loop {
tokio::time::sleep(interval).await;
run_gc(
s3_gc.as_ref(),
lambda_gc.as_ref(),
sqs_gc.as_ref(),
ecr_gc.as_ref(),
);
}
});
}
let services_for_shutdown = Arc::clone(&state.services);
let pm_shutdown = Arc::new(PersistenceManager::new(data_dir));
let billing_for_shutdown = Arc::clone(&billing_meter);
let chaos_for_shutdown = Arc::clone(&state.chaos);
let dynamodb_for_shutdown = Arc::clone(&dynamodb_service);
let logs_for_shutdown = Arc::clone(&logs_service);
let cw_metrics_for_shutdown = Arc::clone(&cw_metrics_service);
let kinesis_for_shutdown = Arc::clone(&kinesis_service);
let ses_for_shutdown = Arc::clone(&ses_service);
let workers_for_shutdown = state.workers.clone();
tokio::spawn(async move {
#[cfg(unix)]
{
use tokio::signal::unix::{SignalKind, signal};
let mut sigint =
signal(SignalKind::interrupt()).expect("failed to install SIGINT handler");
let mut sigterm =
signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
tokio::select! {
_ = sigint.recv() => {}
_ = sigterm.recv() => {}
}
}
#[cfg(not(unix))]
{
tokio::signal::ctrl_c().await.ok();
}
info!("Shutdown signal received - draining background workers + saving snapshots...");
workers_for_shutdown
.shutdown(awsim_core::tick::DEFAULT_SHUTDOWN_DEADLINE)
.await;
pm_shutdown.save_all(&services_for_shutdown);
if let Some(bytes) = billing_for_shutdown.store.snapshot_to_bytes()
&& let Err(e) = pm_shutdown.save_snapshot("billing", &bytes)
{
warn!(error = %e, "Failed to save billing snapshot on shutdown");
}
if let Some(bytes) = chaos_for_shutdown.snapshot_to_bytes()
&& let Err(e) = pm_shutdown.save_snapshot("chaos", &bytes)
{
warn!(error = %e, "Failed to save chaos snapshot on shutdown");
}
cleanup_tempdir("DynamoDB", dynamodb_for_shutdown.tempdir_path());
cleanup_tempdir("CloudWatch Logs", logs_for_shutdown.tempdir_path());
cleanup_tempdir("CloudWatch Metrics", cw_metrics_for_shutdown.tempdir_path());
cleanup_tempdir("Kinesis", kinesis_for_shutdown.tempdir_path());
cleanup_tempdir("SES", ses_for_shutdown.tempdir_path());
info!("Snapshots saved. Exiting.");
std::process::exit(0);
});
let services_for_autosave = Arc::clone(&state.services);
let pm_autosave = Arc::new(PersistenceManager::new(data_dir));
let billing_for_autosave = Arc::clone(&billing_meter);
tokio::spawn(async move {
let interval = std::time::Duration::from_secs(30);
loop {
tokio::time::sleep(interval).await;
let pm = Arc::clone(&pm_autosave);
let services = Arc::clone(&services_for_autosave);
let billing = Arc::clone(&billing_for_autosave);
if let Err(e) = tokio::task::spawn_blocking(move || {
pm.save_all(&services);
if let Some(bytes) = billing.store.snapshot_to_bytes()
&& let Err(e) = pm.save_snapshot("billing", &bytes)
{
warn!(error = %e, "Failed to save billing snapshot");
}
})
.await
{
warn!(error = %e, "Snapshot save_all task panicked");
}
}
});
let body_stores_for_storage = Arc::clone(&state.body_stores);
let billing_for_storage = Arc::clone(&billing_meter);
let data_dir_for_storage = data_dir.clone();
let account_for_storage = cli.account_id.clone();
let region_for_storage = cli.region.clone();
let ec2_for_storage = Arc::clone(&ec2_service);
let rds_for_storage = Arc::clone(&rds_service);
let mq_for_storage = Arc::clone(&mq_service);
let memorydb_for_storage = Arc::clone(&memorydb_service);
tokio::spawn(async move {
let interval = std::time::Duration::from_secs(30);
loop {
tokio::time::sleep(interval).await;
let mut bytes_by_service: std::collections::HashMap<String, u64> =
std::collections::HashMap::new();
for handle in body_stores_for_storage.iter() {
let mut total: u64 = 0;
for group in &handle.groups {
total =
total.saturating_add(handle.body_store.group_size(group).unwrap_or(0));
}
let service_key = match handle.service_name.as_str() {
"cloudwatch-logs" => "logs".to_string(),
other => other.to_string(),
};
bytes_by_service.insert(service_key, total);
}
let ddb_path = std::path::Path::new(&data_dir_for_storage).join("dynamodb.db");
if let Ok(meta) = std::fs::metadata(&ddb_path) {
bytes_by_service.insert("dynamodb".to_string(), meta.len());
}
for (service, bytes) in bytes_by_service {
billing_for_storage.record_storage_sample(
&service,
&account_for_storage,
®ion_for_storage,
bytes,
);
}
let ec2_count = ec2_for_storage
.running_instance_count(&account_for_storage, ®ion_for_storage);
billing_for_storage.record_resource_count_sample(
"ec2",
&account_for_storage,
®ion_for_storage,
ec2_count,
);
let rds_count = rds_for_storage
.running_instance_count(&account_for_storage, ®ion_for_storage);
billing_for_storage.record_resource_count_sample(
"rds",
&account_for_storage,
®ion_for_storage,
rds_count,
);
let mq_count =
mq_for_storage.running_broker_count(&account_for_storage, ®ion_for_storage);
billing_for_storage.record_resource_count_sample(
"mq",
&account_for_storage,
®ion_for_storage,
mq_count,
);
let memorydb_count = memorydb_for_storage
.running_node_count(&account_for_storage, ®ion_for_storage);
billing_for_storage.record_resource_count_sample(
"memorydb",
&account_for_storage,
®ion_for_storage,
memorydb_count,
);
}
});
}
let service_count = state.services.len();
spawn_event_router(&state);
let tick_interval_ms = std::env::var("AWSIM_TICK_INTERVAL_MS")
.ok()
.and_then(|v| v.parse::<u64>().ok())
.filter(|n| (10..=60_000).contains(n))
.unwrap_or(1000);
awsim_core::gateway::spawn_tick_loop(
state.clone(),
std::time::Duration::from_millis(tick_interval_ms),
);
let poll_services = Arc::clone(&state.services);
let sqs_lambda_store = lambda_store.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
integrations::poll_sqs_event_sources(&poll_services, &sqs_lambda_store).await;
}
});
let kinesis_poll_services = Arc::clone(&state.services);
let kinesis_lambda_store = lambda_store.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(5)).await;
integrations::poll_kinesis_event_sources(&kinesis_poll_services, &kinesis_lambda_store)
.await;
}
});
let pipes_runner_services = Arc::clone(&state.services);
let pipes_runner_store = pipes_store.clone();
tokio::spawn(async move {
loop {
tokio::time::sleep(tokio::time::Duration::from_secs(2)).await;
integrations::pipes::run_pipes_once(&pipes_runner_services, &pipes_runner_store).await;
}
});
let lambda_arc = state.services.get("lambda").cloned();
let http_client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(29)) .build()
.expect("build reqwest client for HTTP integrations");
let proxy_state = proxy::ProxyState {
apigw: apigw_service,
apigw_v1: apigw_v1_service,
lambda: lambda_arc,
http_client,
default_account_id: cli.account_id.clone(),
default_region: cli.region.clone(),
};
let proxy_router: axum::Router<()> = axum::Router::new()
.route(
"/restapis/{api_id}/{stage}/_user_request_",
axum::routing::any(proxy::handle_proxy),
)
.route(
"/restapis/{api_id}/{stage}/_user_request_/",
axum::routing::any(proxy::handle_proxy),
)
.route(
"/restapis/{api_id}/{stage}/_user_request_/{*path}",
axum::routing::any(proxy::handle_proxy),
)
.with_state(proxy_state);
let cognito_oauth_state = Arc::new(awsim_cognito::CognitoOAuthState {
cognito: Arc::clone(&cognito_state),
default_account_id: cli.account_id.clone(),
default_region: cli.region.clone(),
auth_codes: Arc::new(dashmap::DashMap::new()),
revoked_refresh_tokens: Arc::new(dashmap::DashMap::new()),
federation: awsim_cognito::federation::FederationState::new(),
port: cli.port,
});
let cognito_oauth_router = awsim_cognito::oauth::router(cognito_oauth_state);
let mock_idp_state = awsim_cognito::mock_idp::MockIdpState::new();
let mock_idp_router = awsim_cognito::mock_idp::router(mock_idp_state);
let https_runtime = if let Some(https_port) = cli.https_port {
Some(prepare_https_runtime(&cli, https_port).await?)
} else {
None
};
let tls_admin_info: Option<Arc<tls::TlsAdminInfo>> = https_runtime
.as_ref()
.map(|h| Arc::new(h.assets.admin_info(h.port)));
let s3_upload_limit = cli.max_s3_upload_bytes;
let operator_auth_state = operator_auth::OperatorAuthState::new(
Arc::clone(&iam_service),
cli.account_id.clone(),
cli.region.clone(),
);
arm_operator_auth_if_required(&operator_auth_state);
let public_auth_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/auth/login",
axum::routing::post(operator_auth::login),
)
.route(
"/_awsim/auth/logout",
axum::routing::post(operator_auth::logout),
)
.route(
"/_awsim/auth/whoami",
axum::routing::get(operator_auth::whoami),
)
.route(
"/_awsim/auth/setup",
axum::routing::post(operator_auth::setup),
)
.with_state(operator_auth_state.clone());
let private_auth_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/auth/reveal-access-key",
axum::routing::post(operator_auth::reveal_access_key),
)
.route(
"/_awsim/auth/credentials",
axum::routing::get(operator_auth::credentials),
)
.layer(axum::middleware::from_fn_with_state(
operator_auth_state.clone(),
operator_auth::require_auth,
))
.with_state(operator_auth_state.clone());
let auth_router = public_auth_router.merge(private_auth_router);
let main_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/health", axum::routing::get(admin::health))
.route("/_awsim/services", axum::routing::get(admin::list_services))
.route("/_awsim/config", axum::routing::get(admin::config))
.route("/_awsim/stats", axum::routing::get(admin::stats))
.route("/_awsim/storage", axum::routing::get(admin::storage))
.route("/_awsim/events", axum::routing::get(admin::events))
.route(
"/_awsim/requests",
axum::routing::get(admin::recent_request_ids),
)
.route(
"/_awsim/requests/{id}",
axum::routing::get(admin::request_detail),
)
.route(
"/_awsim/requests/{id}/replay",
axum::routing::post(admin::replay_request),
)
.layer(axum::middleware::from_fn_with_state(
operator_auth_state.clone(),
operator_auth::require_auth,
))
.route(
"/{bucket}/{*key}",
axum::routing::any(awsim_core::gateway::handle_request)
.layer(axum::extract::DefaultBodyLimit::max(s3_upload_limit)),
)
.fallback(awsim_core::gateway::handle_request)
.with_state(state.clone())
.merge(auth_router);
let opensearch_state = match cli.data_dir.as_deref() {
Some(dir) => {
let path = std::path::Path::new(dir).join("opensearch.redb");
Arc::new(
awsim_opensearch::state::OpenSearchState::open(&path)
.expect("Failed to open opensearch redb"),
)
}
None => Arc::new(
awsim_opensearch::state::OpenSearchState::ephemeral()
.expect("Failed to create ephemeral opensearch state"),
),
};
let opensearch_nested: axum::Router<()> =
axum::Router::new().nest("/opensearch", awsim_opensearch::router(opensearch_state));
let ecr_router = awsim_ecr::router(ecr_service);
let billing_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/billing", axum::routing::get(admin::billing))
.with_state(Arc::clone(&billing_meter));
let chaos_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/chaos/rules",
axum::routing::get(admin::chaos_list).post(admin::chaos_add),
)
.route(
"/_awsim/chaos/rules/{id}",
axum::routing::patch(admin::chaos_patch).delete(admin::chaos_remove),
)
.route(
"/_awsim/chaos/clear",
axum::routing::post(admin::chaos_clear),
)
.route(
"/_awsim/chaos/stats",
axum::routing::get(admin::chaos_stats),
)
.route(
"/_awsim/chaos/presets",
axum::routing::get(admin::chaos_presets_list),
)
.route(
"/_awsim/chaos/presets/{name}",
axum::routing::post(admin::chaos_preset_apply),
)
.with_state(Arc::clone(&state.chaos));
let ddb_admin_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/admin/dynamodb/vacuum",
axum::routing::post(admin::ddb_vacuum),
)
.with_state(Arc::clone(&dynamodb_service));
let sqlite_stats_state = Arc::new(admin::SqliteStatsState {
dynamodb: Arc::clone(&dynamodb_service),
cw_logs: Arc::clone(&logs_service),
cw_metrics: Arc::clone(&cw_metrics_service),
kinesis: Arc::clone(&kinesis_service),
ses: Arc::clone(&ses_service),
});
let sqlite_stats_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/storage/sqlite",
axum::routing::get(admin::sqlite_stats),
)
.with_state(Arc::clone(&sqlite_stats_state));
let ses_admin_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/ses/sent", axum::routing::get(admin::ses_sent))
.with_state(Arc::clone(&ses_service));
let debug_objects_state = Arc::new(admin::DebugObjectsState {
app: state.clone(),
billing: Arc::clone(&billing_meter),
cognito: Arc::clone(&cognito_state),
sqlite: Arc::clone(&sqlite_stats_state),
});
let seed_ddb_state = Arc::new(seed::dynamodb::SeedDdbState {
service: Arc::clone(&dynamodb_service),
default_account: cli.account_id.clone(),
default_region: cli.region.clone(),
});
let seed_cognito_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/seed/cognito-users",
axum::routing::post(seed::cognito::seed),
)
.with_state(Arc::clone(&cognito_state));
let seed_ddb_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/seed/dynamodb",
axum::routing::post(seed::dynamodb::seed),
)
.with_state(seed_ddb_state);
let seed_s3_state = Arc::new(seed::s3::SeedS3State {
service: Arc::clone(&s3_service),
default_account: cli.account_id.clone(),
default_region: cli.region.clone(),
});
let seed_s3_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/seed/s3", axum::routing::post(seed::s3::seed))
.with_state(seed_s3_state);
let seed_secrets_state = Arc::new(seed::secrets::SeedSecretsState {
store: secrets_store.clone(),
default_account: cli.account_id.clone(),
default_region: cli.region.clone(),
});
let seed_secrets_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/seed/secrets",
axum::routing::post(seed::secrets::seed),
)
.with_state(seed_secrets_state);
let seed_sqs_state = Arc::new(seed::sqs::SeedSqsState {
store: sqs_store.clone(),
default_account: cli.account_id.clone(),
default_region: cli.region.clone(),
default_port: cli.port,
});
let seed_sqs_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/seed/sqs", axum::routing::post(seed::sqs::seed))
.with_state(seed_sqs_state);
let seed_router = seed_cognito_router
.merge(seed_ddb_router)
.merge(seed_s3_router)
.merge(seed_secrets_router)
.merge(seed_sqs_router);
let debug_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/debug/objects",
axum::routing::get(admin::debug_objects),
)
.with_state(debug_objects_state);
let bedrock_admin_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/bedrock/config",
axum::routing::get(admin::bedrock_config),
)
.route(
"/_awsim/bedrock/backends/{name}/check",
axum::routing::get(admin::bedrock_backend_check),
)
.with_state(Arc::clone(&bedrock_swap));
let bedrock_defaults_router: axum::Router<()> = axum::Router::new().route(
"/_awsim/bedrock/defaults",
axum::routing::get(admin::bedrock_defaults),
);
let gateway_catalog_router: axum::Router<()> = axum::Router::new().route(
"/_awsim/gateway/catalog",
axum::routing::get(admin::gateway_catalog),
);
let gateway_health_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/gateway/health",
axum::routing::get(admin::gateway_health),
)
.with_state(bedrock_health.clone());
let gateway_health_check_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/gateway/health/{name}/check",
axum::routing::post(admin::gateway_health_check),
)
.with_state((Arc::clone(&bedrock_swap), bedrock_health.clone()));
let gateway_metrics_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/gateway/metrics",
axum::routing::get(admin::gateway_metrics),
)
.with_state(bedrock_metrics.clone());
let gateway_recent_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/gateway/recent",
axum::routing::get(admin::gateway_recent),
)
.with_state(bedrock_recent.clone());
let gateway_test_prompt_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/gateway/test-prompt",
axum::routing::post(admin::gateway_test_prompt),
)
.with_state(Arc::clone(&bedrock_swap));
let runtime_config_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/runtime-config",
axum::routing::get(admin::runtime_config_get).put(admin::runtime_config_put),
)
.route(
"/_awsim/runtime-config/defaults",
axum::routing::get(admin::runtime_config_defaults),
)
.with_state(Arc::clone(&runtime_config_store));
let snapshot_state = Arc::new(named_snapshots::SnapshotState {
app: state.clone(),
billing: Arc::clone(&billing_meter),
});
let snapshot_router: axum::Router<()> = axum::Router::new()
.route(
"/_awsim/snapshots",
axum::routing::get(named_snapshots::list),
)
.route(
"/_awsim/snapshots/{name}",
axum::routing::post(named_snapshots::save).delete(named_snapshots::delete),
)
.route(
"/_awsim/snapshots/{name}/load",
axum::routing::post(named_snapshots::load),
)
.with_state(snapshot_state);
let tls_admin_router: axum::Router<()> = axum::Router::new()
.route("/_awsim/tls", axum::routing::get(admin::tls_info))
.with_state(tls_admin_info.clone());
let app = cognito_oauth_router
.merge(mock_idp_router)
.merge(main_router)
.merge(proxy_router)
.merge(opensearch_nested)
.merge(ecr_router)
.merge(billing_router)
.merge(chaos_router)
.merge(snapshot_router)
.merge(ddb_admin_router)
.merge(sqlite_stats_router)
.merge(ses_admin_router)
.merge(debug_router)
.merge(bedrock_admin_router)
.merge(bedrock_defaults_router)
.merge(gateway_catalog_router)
.merge(gateway_health_router)
.merge(gateway_health_check_router)
.merge(gateway_metrics_router)
.merge(gateway_recent_router)
.merge(gateway_test_prompt_router)
.merge(runtime_config_router)
.merge(seed_router)
.merge(tls_admin_router)
.merge(ui::router())
.layer(axum::middleware::from_fn(ui::root_redirect_middleware))
.layer(tower_http::decompression::RequestDecompressionLayer::new())
.layer(axum::extract::DefaultBodyLimit::max(cli.max_body_bytes))
.layer(
ServiceBuilder::new()
.layer(HandleErrorLayer::new(handle_overload_error))
.layer(LoadShedLayer::new())
.layer(ConcurrencyLimitLayer::new(cli.max_concurrent_requests)),
)
.layer(tower_http::cors::CorsLayer::permissive());
info!(
max_concurrent_requests = cli.max_concurrent_requests,
"Inflight-request cap enabled"
);
spawn_fd_pressure_watcher();
let listener = bind_dual_stack_tokio(cli.port).await?;
println!();
println!(" AWSim v{}", env!("CARGO_PKG_VERSION"));
println!(" Fully Offline AWS Dev Environment");
println!();
println!(" Endpoint: http://localhost:{}", cli.port);
if let Some(ref tls) = https_runtime {
let host = tls.assets.domain.as_deref().unwrap_or("localhost");
println!(" HTTPS: https://{}:{}", host, tls.port);
if tls.assets.public_trust {
println!(
" (publicly-trusted cert - no AWS_CA_BUNDLE / NODE_EXTRA_CA_CERTS needed)"
);
} else {
println!(
" export AWS_CA_BUNDLE={}",
tls.assets.cert_path.display()
);
if tls.assets.generated {
println!(" (self-signed cert generated; reused on subsequent boots)");
}
}
}
if ui::is_bundled() {
println!(" Admin UI: http://localhost:{}/_awsim/ui/", cli.port);
}
println!(" Region: {}", cli.region);
println!(" Account: {}", cli.account_id);
println!(" Services: {} registered", service_count);
if let Some(ref data_dir) = cli.data_dir {
println!(" Persist: {}", data_dir);
}
println!();
info!(
port = cli.port,
https_port = ?cli.https_port,
region = %cli.region,
account_id = %cli.account_id,
services = service_count,
"AWSim started"
);
match https_runtime {
Some(tls) => {
let http_app = app.clone();
let https_app = app.layer(axum::middleware::from_fn(mark_request_https));
let http_fut = async move {
axum::serve(listener, http_app)
.await
.context("HTTP listener failed")
};
let https_fut = async move {
axum_server::from_tcp_rustls(tls.std_listener, tls.assets.config)
.serve(https_app.into_make_service())
.await
.context("HTTPS listener failed")
};
tokio::try_join!(http_fut, https_fut)?;
}
None => {
axum::serve(listener, app).await?;
}
}
Ok(())
}
struct HttpsRuntime {
port: u16,
assets: tls::TlsAssets,
std_listener: std::net::TcpListener,
}
async fn prepare_https_runtime(cli: &Cli, https_port: u16) -> Result<HttpsRuntime> {
let source = match (cli.tls_cert.as_deref(), cli.tls_key.as_deref()) {
(Some(cert), Some(key)) => tls::CertSource::Byo { cert, key },
_ => {
let dir = cli
.tls_cache_dir
.clone()
.or_else(|| {
cli.data_dir
.as_deref()
.map(|d| std::path::PathBuf::from(d).join("tls"))
})
.unwrap_or_else(tls::default_cache_dir);
#[cfg(has_bundled_cert)]
{
tls::CertSource::Bundled { dir }
}
#[cfg(not(has_bundled_cert))]
{
tls::CertSource::Managed { dir }
}
}
};
let assets = tls::load_or_generate(source).await?;
let std_listener = bind_dual_stack_std(https_port)
.with_context(|| format!("binding HTTPS listener on port {https_port}"))?;
Ok(HttpsRuntime {
port: https_port,
assets,
std_listener,
})
}
async fn mark_request_https(
mut req: axum::http::Request<axum::body::Body>,
next: axum::middleware::Next,
) -> axum::response::Response {
let authority = req.uri().authority().map(|a| a.to_string());
let headers = req.headers_mut();
if !headers.contains_key("x-forwarded-proto") {
headers.insert(
"x-forwarded-proto",
axum::http::HeaderValue::from_static("https"),
);
}
if !headers.contains_key("host")
&& let Some(a) = authority
&& let Ok(v) = axum::http::HeaderValue::from_str(&a)
{
headers.insert("host", v);
}
next.run(req).await
}
async fn bind_dual_stack_tokio(port: u16) -> Result<tokio::net::TcpListener> {
let addr_v6 = std::net::SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port));
match tokio::net::TcpListener::bind(addr_v6).await {
Ok(l) => Ok(l),
Err(e) => {
warn!(port, error = %e, "Could not bind on [::] - falling back to 0.0.0.0");
let addr_v4 = std::net::SocketAddr::from(([0, 0, 0, 0], port));
Ok(tokio::net::TcpListener::bind(addr_v4).await?)
}
}
}
fn bind_dual_stack_std(port: u16) -> Result<std::net::TcpListener> {
let addr_v6 = std::net::SocketAddr::from((std::net::Ipv6Addr::UNSPECIFIED, port));
let listener = match std::net::TcpListener::bind(addr_v6) {
Ok(l) => l,
Err(e) => {
warn!(port, error = %e, "Could not bind on [::] - falling back to 0.0.0.0");
let addr_v4 = std::net::SocketAddr::from(([0, 0, 0, 0], port));
std::net::TcpListener::bind(addr_v4)?
}
};
listener.set_nonblocking(true)?;
Ok(listener)
}
fn cleanup_tempdir(label: &str, path: Option<&std::path::Path>) {
let Some(path) = path else { return };
if let Err(e) = std::fs::remove_dir_all(path) {
warn!(
service = label,
path = %path.display(),
error = %e,
"Failed to remove tempdir on shutdown"
);
}
}
async fn run_vacuum(endpoint: &str) -> Result<()> {
let client = reqwest::Client::builder()
.timeout(std::time::Duration::from_secs(120))
.build()?;
let url = format!(
"{}/_awsim/admin/dynamodb/vacuum",
endpoint.trim_end_matches('/')
);
let resp = client.post(&url).send().await?;
if !resp.status().is_success() {
let status = resp.status();
let text = resp.text().await.unwrap_or_default();
anyhow::bail!("HTTP {status}: {text}");
}
println!("DynamoDB VACUUM complete");
Ok(())
}
async fn handle_overload_error(err: BoxError) -> impl IntoResponse {
if err.is::<tower::load_shed::error::Overloaded>() {
warn!("Request rejected — concurrency limit reached");
(
StatusCode::SERVICE_UNAVAILABLE,
"AWSim is at the configured concurrent-request cap. \
Bound your client's parallelism or raise --max-concurrent-requests.",
)
.into_response()
} else {
(
StatusCode::INTERNAL_SERVER_ERROR,
format!("Unhandled middleware error: {err}"),
)
.into_response()
}
}
#[cfg(unix)]
fn spawn_fd_pressure_watcher() {
let pid = std::process::id();
let fd_dir = std::path::PathBuf::from(format!("/proc/{pid}/fd"));
if !fd_dir.exists() {
debug!("/proc/<pid>/fd not available — skipping fd-pressure watcher");
return;
}
let (_, hard) = match rlimit::getrlimit(rlimit::Resource::NOFILE) {
Ok(p) => p,
Err(_) => return,
};
let warn_at = (hard as f64 * 0.5) as u64;
let crit_at = (hard as f64 * 0.8) as u64;
tokio::spawn(async move {
let mut interval = tokio::time::interval(std::time::Duration::from_secs(30));
loop {
interval.tick().await;
let count = match std::fs::read_dir(&fd_dir) {
Ok(d) => d.count() as u64,
Err(_) => break, };
if count >= crit_at {
error!(
open_fds = count,
hard_limit = hard,
threshold_pct = 80,
"fd usage critical — listener will start dropping connections"
);
} else if count >= warn_at {
warn!(
open_fds = count,
hard_limit = hard,
threshold_pct = 50,
"fd usage elevated — check for client connection leaks"
);
} else {
debug!(open_fds = count, hard_limit = hard);
}
}
});
}
#[cfg(unix)]
fn raise_nofile_limit() {
const TARGET: u64 = 65_536;
let (soft, hard) = match rlimit::getrlimit(rlimit::Resource::NOFILE) {
Ok(pair) => pair,
Err(e) => {
warn!(error = %e, "Could not read NOFILE rlimit; skipping bump");
return;
}
};
let desired = TARGET.min(hard);
if soft >= desired {
return;
}
if let Err(e) = rlimit::setrlimit(rlimit::Resource::NOFILE, desired, hard) {
warn!(
from = soft,
to = desired,
hard = hard,
error = %e,
"Could not raise NOFILE rlimit; bulk imports may hit fd exhaustion",
);
return;
}
info!(
from = soft,
to = desired,
hard = hard,
"Raised NOFILE rlimit"
);
}
#[cfg(not(unix))]
fn spawn_fd_pressure_watcher() {}
#[cfg(not(unix))]
fn raise_nofile_limit() {}
fn spawn_event_router(state: &AppState) {
use std::sync::Arc;
let mut rx = state.event_bus.subscribe();
let services = Arc::clone(&state.services);
let default_region = state.default_region.clone();
let default_account_id = state.default_account_id.clone();
tokio::spawn(async move {
loop {
match rx.recv().await {
Ok(event) => {
let protocol = event.detail["protocol"].as_str().unwrap_or("").to_string();
let endpoint = event.detail["endpoint"].as_str().unwrap_or("").to_string();
let message = event.detail["message"].as_str().unwrap_or("").to_string();
let message_id = event.detail["message_id"]
.as_str()
.unwrap_or("")
.to_string();
let topic_arn = event.detail["topic_arn"].as_str().unwrap_or("").to_string();
let subject = event.detail["subject"].as_str().map(str::to_string);
let subscription_arn = event.detail["subscription_arn"]
.as_str()
.unwrap_or("")
.to_string();
let message_attributes = event.detail.get("message_attributes").cloned();
let raw_message_delivery = event.detail["raw_message_delivery"]
.as_bool()
.unwrap_or(false);
match event.event_type.as_str() {
"sns:Publish" if protocol == "sqs" => {
let queue_url =
arn_to_sqs_url(&endpoint, &default_region, &default_account_id);
if let Some(sqs_handler) = services.get("sqs") {
let ctx = RequestContext {
account_id: event.account_id.clone(),
region: event.region.clone(),
partition: awsim_core::DEFAULT_PARTITION.to_string(),
service: "sqs".to_string(),
access_key: None,
request_id: uuid::Uuid::new_v4().to_string(),
method: "POST".to_string(),
uri: "/".to_string(),
event_bus: None,
source_ip: None,
is_secure: false,
internal_bypass: false,
};
let mut input = if raw_message_delivery {
let mut sqs_attrs = serde_json::Map::new();
if let Some(attrs) = &message_attributes
&& let Some(map) = attrs.as_object()
{
for (k, v) in map {
let data_type = v["Type"].as_str().unwrap_or("String");
let string_value = v["Value"].as_str().unwrap_or("");
sqs_attrs.insert(
k.clone(),
serde_json::json!({
"DataType": data_type,
"StringValue": string_value,
}),
);
}
}
let mut input = serde_json::json!({
"QueueUrl": queue_url,
"MessageBody": message,
});
if !sqs_attrs.is_empty() {
input["MessageAttributes"] =
serde_json::Value::Object(sqs_attrs);
}
input
} else {
let timestamp = iso8601_now();
let unsubscribe_url = format!(
"http://sns.{region}.localhost/?Action=Unsubscribe&SubscriptionArn={sub}",
region = event.region,
sub = subscription_arn,
);
let signing_cert_url = format!(
"http://sns.{region}.localhost/SimpleNotificationService-awsim.pem",
region = event.region,
);
let mut envelope = serde_json::json!({
"Type": "Notification",
"MessageId": message_id,
"TopicArn": topic_arn,
"Message": message,
"Timestamp": timestamp,
"SignatureVersion": "1",
"Signature": "awsim-fixture-signature",
"SigningCertURL": signing_cert_url,
"UnsubscribeURL": unsubscribe_url,
});
if let Some(s) = &subject {
envelope["Subject"] = serde_json::Value::String(s.clone());
}
if let Some(attrs) = &message_attributes
&& attrs.as_object().is_some_and(|m| !m.is_empty())
{
envelope["MessageAttributes"] = attrs.clone();
}
serde_json::json!({
"QueueUrl": queue_url,
"MessageBody": envelope.to_string(),
})
};
let _ = &mut input;
match sqs_handler.handle("SendMessage", input, &ctx).await {
Ok(_) => {
info!(
topic = %topic_arn,
queue = %endpoint,
"SNS→SQS fan-out delivered"
);
}
Err(e) => {
warn!(
topic = %topic_arn,
queue = %endpoint,
error = %e.message,
"SNS→SQS fan-out delivery failed"
);
}
}
}
}
"sns:Publish" if protocol == "lambda" => {
info!(
topic = %topic_arn,
function = %endpoint,
"SNS→Lambda fan-out: not yet implemented"
);
}
"cloudformation:CreateResource" => {
integrations::handle_cf_create_resource(&services, &event).await;
}
"cloudformation:DeleteResource" => {
integrations::handle_cf_delete_resource(&services, &event).await;
}
"dynamodb:StreamRecord" => {
integrations::handle_dynamodb_stream(&services, &event).await;
}
"eventbridge:TargetInvocation" => {
integrations::handle_eventbridge_target(&services, &event).await;
}
"cognito:LambdaTrigger" => {
integrations::handle_cognito_trigger(&services, &event).await;
}
t if t.starts_with("s3:ObjectCreated:")
|| t.starts_with("s3:ObjectRemoved:") =>
{
integrations::handle_s3_event(&services, &event).await;
}
_ => {
}
}
}
Err(tokio::sync::broadcast::error::RecvError::Lagged(n)) => {
warn!(
skipped = n,
"Event bus receiver lagged; some events were dropped"
);
}
Err(tokio::sync::broadcast::error::RecvError::Closed) => {
break;
}
}
}
});
}
fn arm_operator_auth_if_required(state: &operator_auth::OperatorAuthState) {
let enabled = std::env::var("AWSIM_REQUIRE_OPERATOR_AUTH")
.map(|v| matches!(v.as_str(), "1" | "true" | "TRUE" | "yes" | "YES"))
.unwrap_or(false);
if !enabled {
return;
}
let iam_state = state
.iam
.store()
.get(&state.default_account_id, awsim_iam::IAM_REGION);
let root_exists = iam_state
.login_profiles
.contains_key(operator_auth::ROOT_USERNAME);
if root_exists {
state.mark_bootstrap_complete();
info!(
"AWSIM_REQUIRE_OPERATOR_AUTH is on; root login profile present, admin access requires login."
);
return;
}
let token = state.arm_bootstrap();
println!();
println!("===================================================================");
println!(" AWSim operator setup required");
println!("-------------------------------------------------------------------");
println!(" AWSIM_REQUIRE_OPERATOR_AUTH=true and no root login profile");
println!(" exists. Pick a root password and POST to /_awsim/auth/setup:");
println!();
println!(" curl -s -X POST http://localhost:4566/_awsim/auth/setup \\");
println!(" -H 'content-type: application/json' \\");
println!(
" -d '{{\"bootstrap_token\":\"{token}\",\"password\":\"<choose-a-strong-password>\"}}'"
);
println!();
println!(" The token above is shown once; copy it now. Until setup runs,");
println!(" all admin endpoints return 503 OperatorSetupRequired.");
println!("===================================================================");
println!();
}
fn iso8601_now() -> String {
use std::time::{SystemTime, UNIX_EPOCH};
let secs = SystemTime::now()
.duration_since(UNIX_EPOCH)
.map(|d| d.as_secs())
.unwrap_or(0);
let s = secs % 60;
let mins = secs / 60;
let min = mins % 60;
let hours = mins / 60;
let h = hours % 24;
let mut days = hours / 24;
let mut y = 1970u64;
loop {
let leap = (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
let dy = if leap { 366 } else { 365 };
if days < dy {
break;
}
days -= dy;
y += 1;
}
let leap = (y.is_multiple_of(4) && !y.is_multiple_of(100)) || y.is_multiple_of(400);
let months = if leap {
[31u64, 29, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
} else {
[31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31]
};
let mut mo = 0usize;
while days >= months[mo] {
days -= months[mo];
mo += 1;
}
let d = days + 1;
format!("{y:04}-{:02}-{d:02}T{h:02}:{min:02}:{s:02}.000Z", mo + 1)
}
fn arn_to_sqs_url(arn: &str, default_region: &str, default_account: &str) -> String {
let parts: Vec<&str> = arn.splitn(6, ':').collect();
if parts.len() == 6 {
let region = parts[3];
let account = parts[4];
let queue = parts[5];
format!("http://sqs.{region}.localhost:4566/{account}/{queue}")
} else {
let queue = arn.rsplit(':').next().unwrap_or(arn);
format!("http://sqs.{default_region}.localhost:4566/{default_account}/{queue}")
}
}
struct IamTrustPolicyResolver(std::sync::Arc<awsim_iam::IamService>);
impl awsim_sts::TrustPolicyResolver for IamTrustPolicyResolver {
fn resolve(&self, account_id: &str, role_arn: &str) -> Option<String> {
self.0.lookup_role_trust_policy(account_id, role_arn)
}
}
type RegisteredServices = (
Arc<awsim_apigateway::ApiGatewayService>,
Arc<awsim_apigateway::ApiGatewayV1Service>,
Arc<awsim_cognito::CognitoState>,
Arc<awsim_iam::IamService>,
awsim_core::AccountRegionStore<awsim_iam::state::IamState>,
awsim_core::AccountRegionStore<awsim_s3::state::S3State>,
awsim_core::AccountRegionStore<awsim_kms::state::KmsState>,
awsim_core::AccountRegionStore<awsim_sqs::state::SqsState>,
awsim_core::AccountRegionStore<awsim_secretsmanager::state::SecretsState>,
awsim_core::AccountRegionStore<awsim_lambda::state::LambdaState>,
awsim_core::AccountRegionStore<awsim_organizations::state::OrganizationsState>,
Arc<awsim_ecr::EcrService>,
Arc<awsim_s3::S3Service>,
Arc<awsim_lambda::LambdaService>,
Arc<awsim_sqs::SqsService>,
Arc<awsim_cloudwatch_logs::CloudWatchLogsService>,
awsim_core::AccountRegionStore<awsim_pipes::PipesState>,
Arc<awsim_ec2::Ec2Service>,
Arc<awsim_rds::RdsService>,
Arc<awsim_mq::MqService>,
Arc<awsim_memorydb::MemoryDbService>,
Arc<awsim_dynamodb::DynamoDbService>,
Arc<awsim_cloudwatch_metrics::CloudWatchMetricsService>,
Arc<awsim_kinesis::KinesisService>,
Arc<awsim_ses::SesService>,
Arc<awsim_sts::StsSessionStore>,
awsim_core::AccountRegionStore<awsim_sns::state::SnsState>,
awsim_core::AccountRegionStore<awsim_ecr::state::EcrState>,
);
fn build_runtime_config_store(cli: &Cli) -> Result<Arc<runtime_config::RuntimeConfigStore>> {
let seed = build_runtime_config_seed(cli)?;
let path = cli
.data_dir
.as_deref()
.map(|d| std::path::PathBuf::from(d).join(runtime_config::CONFIG_FILENAME));
let store = runtime_config::RuntimeConfigStore::load_or_seed(seed, path)
.context("loading runtime config")?;
Ok(Arc::new(store))
}
fn build_runtime_config_seed(cli: &Cli) -> Result<runtime_config::RuntimeConfig> {
let bedrock_spec = if let Some(path) = cli.bedrock_config.as_deref() {
let raw =
std::fs::read_to_string(path).with_context(|| format!("reading {}", path.display()))?;
toml::from_str::<awsim_bedrock::BedrockSpec>(&raw)
.with_context(|| format!("parsing bedrock config {}", path.display()))?
} else if let Some(endpoint) = cli.bedrock_backend.as_deref() {
let mut backends = std::collections::HashMap::new();
backends.insert(
"default".to_string(),
awsim_bedrock::BackendSpec {
endpoint: endpoint.to_string(),
provider: None,
credential: None,
api_key: cli.bedrock_api_key.clone(),
api_key_env: None,
},
);
let (invoke, embed) = match cli.bedrock_model_map.as_deref() {
Some(p) => {
let raw = std::fs::read_to_string(p)
.with_context(|| format!("reading {}", p.display()))?;
let parsed: ModelMapToml = toml::from_str(&raw)
.with_context(|| format!("parsing model map {}", p.display()))?;
(parsed.invoke, parsed.embed)
}
None => (Default::default(), Default::default()),
};
awsim_bedrock::BedrockSpec {
default_backend: Some("default".into()),
credentials: Default::default(),
backends,
aliases: Default::default(),
invoke,
embed,
pricing: Default::default(),
}
} else {
awsim_bedrock::BedrockSpec::default()
};
let bedrock_enabled = !bedrock_spec.backends.is_empty();
Ok(runtime_config::RuntimeConfig {
bedrock: runtime_config::BedrockSection {
enabled: bedrock_enabled,
spec: bedrock_spec,
},
ses: runtime_config::SesSection {
retention_hours: cli.ses_retention_hours,
},
iam: runtime_config::IamSection {
enforce: std::env::var("AWSIM_IAM_ENFORCE").ok().as_deref() == Some("true"),
},
logging: runtime_config::LoggingSection {
level: cli.log_level.clone(),
},
})
}
#[derive(serde::Deserialize, Default)]
struct ModelMapToml {
#[serde(default)]
invoke: std::collections::HashMap<String, awsim_bedrock::ModelEntry>,
#[serde(default)]
embed: std::collections::HashMap<String, awsim_bedrock::ModelEntry>,
}
fn build_bedrock_backend_from_config(
cfg: &runtime_config::RuntimeConfig,
health: &awsim_bedrock::HealthRegistry,
metrics: &awsim_bedrock::MetricsRegistry,
recent: &awsim_bedrock::RecentInvocations,
) -> Result<Option<awsim_bedrock::BedrockBackends>> {
if !cfg.bedrock.enabled || cfg.bedrock.spec.backends.is_empty() {
return Ok(None);
}
let backends =
awsim_bedrock::build_from_spec(cfg.bedrock.spec.clone(), |v| std::env::var(v).ok())
.context("building bedrock backends from runtime config")?
.with_health(health.clone())
.with_metrics(metrics.clone(), recent.clone());
info!(
backends = ?backends.backend_names(),
default = ?backends.default_name(),
"Bedrock proxy backend enabled"
);
Ok(Some(backends))
}
#[allow(clippy::too_many_arguments)]
fn register_services(
state: &mut AppState,
default_account_id: &str,
default_region: &str,
data_dir: Option<&str>,
port: u16,
max_blob_bytes: Option<u64>,
ddb_wal_checkpoint_secs: Option<u64>,
ddb_ttl_grace_secs: u64,
bedrock_swap: awsim_bedrock::BedrockBackendsSwap,
) -> RegisteredServices {
use std::sync::Arc;
let iam = Arc::new(awsim_iam::IamService::new());
let iam_store = iam.store();
let iam_service_clone = Arc::clone(&iam);
state.register(iam, vec![]);
let sts_sessions = Arc::new(awsim_sts::StsSessionStore::new());
let sts = Arc::new(awsim_sts::StsService::with_session_store(Arc::clone(
&sts_sessions,
)));
sts.set_trust_policy_resolver(Arc::new(IamTrustPolicyResolver(Arc::clone(
&iam_service_clone,
))));
state.register(sts, vec![]);
let kms = Arc::new(awsim_kms::KmsService::new());
let kms_store = kms.store();
let sns_kms_lookup: Arc<dyn awsim_core::KmsKeyLookup> =
Arc::new(awsim_kms::KmsKeyResolver::new(kms_store.clone()));
let sns = Arc::new(awsim_sns::SnsService::new().with_kms_lookup(sns_kms_lookup));
let sns_store = sns.store_handle();
state.register(sns, vec![]);
let sqs = match data_dir {
Some(dir) => {
let svc = awsim_sqs::SqsService::with_data_dir(dir);
match max_blob_bytes {
Some(n) => svc.with_max_blob_bytes(n),
None => svc,
}
}
None => awsim_sqs::SqsService::new(),
};
let sqs_store = sqs.store();
let sqs_arc = Arc::new(sqs);
let sqs_clone = Arc::clone(&sqs_arc);
state.register(sqs_arc, vec![]);
let dynamodb = Arc::new(match data_dir {
Some(dir) => awsim_dynamodb::DynamoDbService::with_data_dir(dir),
None => awsim_dynamodb::DynamoDbService::new(),
});
dynamodb.spawn_ttl_sweeper(60, ddb_ttl_grace_secs);
match ddb_wal_checkpoint_secs {
Some(0) => info!("DynamoDB WAL checkpointer disabled"),
secs => {
let interval = secs.unwrap_or(60);
info!(
interval_secs = interval,
"DynamoDB WAL checkpointer enabled"
);
dynamodb.spawn_wal_checkpointer(interval);
}
}
let dynamodb_clone = Arc::clone(&dynamodb);
state.register(dynamodb, vec![]);
let s3 = match data_dir {
Some(dir) => {
let svc = awsim_s3::S3Service::with_data_dir(dir);
match max_blob_bytes {
Some(n) => svc.with_max_blob_bytes(n),
None => svc,
}
}
None => awsim_s3::S3Service::new(),
};
let s3_store = s3.store();
let s3_routes = {
use awsim_core::ServiceHandler;
s3.routes()
};
let s3_arc = Arc::new(s3);
let s3_clone = Arc::clone(&s3_arc);
state.register(s3_arc, s3_routes);
let lambda = match data_dir {
Some(dir) => {
let svc = awsim_lambda::LambdaService::with_data_dir(dir);
match max_blob_bytes {
Some(n) => svc.with_max_blob_bytes(n),
None => svc,
}
}
None => awsim_lambda::LambdaService::new(),
};
let lambda_store = lambda.store();
let lambda_routes = {
use awsim_core::ServiceHandler;
lambda.routes()
};
let lambda_arc = Arc::new(lambda);
let lambda_clone = Arc::clone(&lambda_arc);
state.register(lambda_arc, lambda_routes);
let logs = match data_dir {
Some(dir) => {
let svc = awsim_cloudwatch_logs::CloudWatchLogsService::with_data_dir(dir);
match max_blob_bytes {
Some(n) => svc.with_max_blob_bytes(n),
None => svc,
}
}
None => awsim_cloudwatch_logs::CloudWatchLogsService::new(),
};
let logs_arc = Arc::new(logs);
let logs_clone = Arc::clone(&logs_arc);
state.register(logs_arc, vec![]);
let eb_iam_lookup: Arc<dyn awsim_core::PrincipalLookup> =
Arc::new(awsim_iam::authz::IamPrincipalLookup::new(iam_store.clone()));
let eventbridge =
Arc::new(awsim_eventbridge::EventBridgeService::new().with_iam_lookup(eb_iam_lookup));
state.register(eventbridge, vec![]);
state.register(kms, vec![]);
let lambda_invoker: Arc<dyn awsim_core::LambdaInvoker> = Arc::new(
awsim_lambda::LambdaServiceInvoker::new(lambda_store.clone()),
);
let secretsmanager = Arc::new(
awsim_secretsmanager::SecretsManagerService::new().with_lambda_invoker(lambda_invoker),
);
let secrets_store = secretsmanager.store();
state.register(secretsmanager, vec![]);
let ssm = Arc::new(awsim_ssm::SsmService::new());
let ssm_store = ssm.store();
state.register(ssm, vec![]);
let stepfunctions = Arc::new(awsim_stepfunctions::StepFunctionsService::new());
state.register(stepfunctions, vec![]);
let kinesis = Arc::new(match data_dir {
Some(dir) => awsim_kinesis::KinesisService::with_data_dir(dir),
None => awsim_kinesis::KinesisService::new(),
});
let kinesis_clone = Arc::clone(&kinesis);
state.register(kinesis, vec![]);
let ses_service = Arc::new(match data_dir {
Some(dir) => awsim_ses::SesService::with_data_dir(dir),
None => awsim_ses::SesService::new(),
});
let ses_routes = {
use awsim_core::ServiceHandler;
ses_service.routes()
};
state.register(Arc::clone(&ses_service) as _, ses_routes);
let cognito = Arc::new(awsim_cognito::CognitoService::new());
let cognito_arc_state = cognito.state_for(default_account_id, default_region);
state.register(cognito, vec![]);
let cognito_identity = Arc::new(awsim_cognito::CognitoIdentityService::with_session_store(
Arc::clone(&sts_sessions),
));
state.register(cognito_identity, vec![]);
let ecr = match data_dir {
Some(dir) => {
let svc = awsim_ecr::EcrService::with_data_dir(dir);
match max_blob_bytes {
Some(n) => svc.with_max_blob_bytes(n),
None => svc,
}
}
None => awsim_ecr::EcrService::new(),
};
let ecr = Arc::new(ecr.with_port(port));
let ecr_clone = Arc::clone(&ecr);
let ecr_store = ecr.store();
state.register(ecr, vec![]);
let servicediscovery = Arc::new(awsim_servicediscovery::ServiceDiscoveryService::new());
let servicediscovery_store = servicediscovery.store();
let ecs_iam_lookup: Arc<dyn awsim_core::PrincipalLookup> =
Arc::new(awsim_iam::authz::IamPrincipalLookup::new(iam_store.clone()));
let ecs_secrets_lookup: Arc<dyn awsim_core::SecretLookup> = Arc::new(
awsim_secretsmanager::SecretsManagerSecretLookup::new(secrets_store.clone()),
);
let ecs_parameters_lookup: Arc<dyn awsim_core::ParameterLookup> =
Arc::new(awsim_ssm::SsmParameterLookup::new(ssm_store.clone()));
let ecs_cloudmap_registrar: Arc<dyn awsim_core::CloudMapRegistrar> = Arc::new(
awsim_servicediscovery::CloudMapServiceRegistrar::new(servicediscovery_store),
);
let ecs = Arc::new(
awsim_ecs::EcsService::new()
.with_iam_lookup(ecs_iam_lookup)
.with_secrets_lookup(ecs_secrets_lookup)
.with_parameters_lookup(ecs_parameters_lookup)
.with_cloudmap_registrar(ecs_cloudmap_registrar),
);
state.register(ecs, vec![]);
let ec2 = Arc::new(awsim_ec2::Ec2Service::new());
let ec2_clone = Arc::clone(&ec2);
state.register(ec2, vec![]);
let rds = Arc::new(awsim_rds::RdsService::new());
let rds_clone = Arc::clone(&rds);
state.register(rds, vec![]);
let appsync = awsim_appsync::AppSyncService::new();
let appsync_routes = {
use awsim_core::ServiceHandler;
appsync.routes()
};
state.register(Arc::new(appsync), appsync_routes);
let bedrock = awsim_bedrock::BedrockService::new();
let bedrock_routes = {
use awsim_core::ServiceHandler;
bedrock.routes()
};
state.register(Arc::new(bedrock), bedrock_routes);
let bedrock_runtime = awsim_bedrock::BedrockRuntimeService::with_swap(bedrock_swap);
let bedrock_runtime_routes = {
use awsim_core::ServiceHandler;
bedrock_runtime.routes()
};
state.register(Arc::new(bedrock_runtime), bedrock_runtime_routes);
let cloudformation = Arc::new(awsim_cloudformation::CloudFormationService::new());
state.register(cloudformation, vec![]);
let route53 = awsim_route53::Route53Service::new();
let route53_routes = {
use awsim_core::ServiceHandler;
route53.routes()
};
state.register(Arc::new(route53), route53_routes);
let cloudwatch_metrics = Arc::new(match data_dir {
Some(dir) => awsim_cloudwatch_metrics::CloudWatchMetricsService::with_data_dir(dir),
None => awsim_cloudwatch_metrics::CloudWatchMetricsService::new(),
});
let cloudwatch_metrics_clone = Arc::clone(&cloudwatch_metrics);
state.register(cloudwatch_metrics, vec![]);
let athena = Arc::new(awsim_athena::AthenaService::new());
state.register(athena, vec![]);
let glue = Arc::new(awsim_glue::GlueService::new());
state.register(glue, vec![]);
let elb = Arc::new(awsim_elb::ElbService::new());
state.register(elb, vec![]);
let cloudfront = awsim_cloudfront::CloudFrontService::new();
let cloudfront_routes = {
use awsim_core::ServiceHandler;
cloudfront.routes()
};
state.register(Arc::new(cloudfront), cloudfront_routes);
let acm = Arc::new(awsim_acm::AcmService::new());
state.register(acm, vec![]);
let waf = Arc::new(awsim_waf::WafService::new());
state.register(waf, vec![]);
let scheduler = awsim_scheduler::SchedulerService::new();
let scheduler_routes = {
use awsim_core::ServiceHandler;
scheduler.routes()
};
state.register(Arc::new(scheduler), scheduler_routes);
let comprehend = Arc::new(awsim_comprehend::ComprehendService::new());
state.register(comprehend, vec![]);
let kendra = Arc::new(awsim_kendra::KendraService::new());
state.register(kendra, vec![]);
let organizations = Arc::new(awsim_organizations::OrganizationsService::new());
let organizations_store = organizations.store();
state.register(organizations, vec![]);
let cloudtrail = Arc::new(awsim_cloudtrail::CloudTrailService::new());
state.register(cloudtrail, vec![]);
let eks = awsim_eks::EksService::new();
let eks_routes = {
use awsim_core::ServiceHandler;
eks.routes()
};
state.register(Arc::new(eks), eks_routes);
let firehose = Arc::new(awsim_firehose::FirehoseService::new());
state.register(firehose, vec![]);
let batch = awsim_batch::BatchService::new();
let batch_routes = {
use awsim_core::ServiceHandler;
batch.routes()
};
state.register(Arc::new(batch), batch_routes);
let sso_admin = Arc::new(awsim_sso_admin::SsoAdminService::new());
state.register(sso_admin, vec![]);
let datasync = Arc::new(awsim_datasync::DataSyncService::new());
state.register(datasync, vec![]);
let polly = awsim_polly::PollyService::new();
let polly_routes = {
use awsim_core::ServiceHandler;
polly.routes()
};
state.register(Arc::new(polly), polly_routes);
let resourcegroupstagging =
Arc::new(awsim_resourcegroupstagging::ResourceGroupsTaggingService::new());
state.register(resourcegroupstagging, vec![]);
let pipes = awsim_pipes::PipesService::new();
let pipes_store = pipes.store();
let pipes_routes = {
use awsim_core::ServiceHandler;
pipes.routes()
};
state.register(Arc::new(pipes), pipes_routes);
let efs = awsim_efs::EfsService::new();
let efs_routes = {
use awsim_core::ServiceHandler;
efs.routes()
};
state.register(Arc::new(efs), efs_routes);
let backup = awsim_backup::BackupService::new();
let backup_routes = {
use awsim_core::ServiceHandler;
backup.routes()
};
state.register(Arc::new(backup), backup_routes);
let app_autoscaling = Arc::new(awsim_application_autoscaling::AppAutoScalingService::new());
state.register(app_autoscaling, vec![]);
let xray = awsim_xray::XrayService::new();
let xray_routes = {
use awsim_core::ServiceHandler;
xray.routes()
};
state.register(Arc::new(xray), xray_routes);
state.register(servicediscovery, vec![]);
let appconfig = awsim_appconfig::AppConfig::new();
let appconfig_routes = {
use awsim_core::ServiceHandler;
appconfig.routes()
};
state.register(Arc::new(appconfig), appconfig_routes);
let glacier = awsim_glacier::GlacierService::new();
let glacier_routes = {
use awsim_core::ServiceHandler;
glacier.routes()
};
state.register(Arc::new(glacier), glacier_routes);
let mq = awsim_mq::MqService::new();
let mq_routes = {
use awsim_core::ServiceHandler;
mq.routes()
};
let mq_arc = Arc::new(mq);
let mq_clone = Arc::clone(&mq_arc);
state.register(mq_arc, mq_routes);
let memorydb = Arc::new(awsim_memorydb::MemoryDbService::new());
let memorydb_clone = Arc::clone(&memorydb);
state.register(memorydb, vec![]);
let qldb = awsim_qldb::QldbService::new();
let qldb_routes = {
use awsim_core::ServiceHandler;
qldb.routes()
};
state.register(Arc::new(qldb), qldb_routes);
let transfer = Arc::new(awsim_transfer::TransferService::new());
state.register(transfer, vec![]);
let pinpoint = awsim_pinpoint::PinpointService::new();
let pinpoint_routes = {
use awsim_core::ServiceHandler;
pinpoint.routes()
};
state.register(Arc::new(pinpoint), pinpoint_routes);
let identitystore = Arc::new(awsim_identitystore::IdentityStoreService::new());
state.register(identitystore, vec![]);
let apigateway = Arc::new(awsim_apigateway::ApiGatewayService::new());
let apigw_routes = {
use awsim_core::ServiceHandler;
apigateway.routes()
};
let apigw_clone = Arc::clone(&apigateway);
state.register(apigateway, apigw_routes);
let apigw_v1 = Arc::new(awsim_apigateway::ApiGatewayV1Service::new());
let apigw_v1_routes = {
use awsim_core::ServiceHandler;
apigw_v1.routes()
};
let apigw_v1_clone = Arc::clone(&apigw_v1);
state.register(apigw_v1, apigw_v1_routes);
(
apigw_clone,
apigw_v1_clone,
cognito_arc_state,
iam_service_clone,
iam_store,
s3_store,
kms_store,
sqs_store,
secrets_store,
lambda_store,
organizations_store,
ecr_clone,
s3_clone,
lambda_clone,
sqs_clone,
logs_clone,
pipes_store,
ec2_clone,
rds_clone,
mq_clone,
memorydb_clone,
dynamodb_clone,
cloudwatch_metrics_clone,
kinesis_clone,
ses_service,
sts_sessions,
sns_store,
ecr_store,
)
}
fn run_gc(
s3: &awsim_s3::S3Service,
lambda: &awsim_lambda::LambdaService,
sqs: &awsim_sqs::SqsService,
ecr: &awsim_ecr::EcrService,
) {
gc_one("s3", s3.body_store(), awsim_s3::S3Service::GROUPS, s3);
gc_one(
"lambda",
lambda.body_store(),
awsim_lambda::LambdaService::GROUPS,
lambda,
);
gc_one("sqs", sqs.body_store(), awsim_sqs::SqsService::GROUPS, sqs);
gc_one("ecr", ecr.body_store(), awsim_ecr::EcrService::GROUPS, ecr);
}
fn gc_one(
service: &str,
body_store: Option<&Arc<BodyStore>>,
groups: &[&str],
inventory: &dyn BlobInventory,
) {
let Some(bs) = body_store else {
return;
};
let known: std::collections::HashSet<(String, String, String)> =
inventory.known_blobs().into_iter().collect();
match bs.gc_orphaned(groups, &known) {
Ok((deleted, freed_bytes)) => {
if deleted > 0 {
info!(
service,
deleted, freed_bytes, "BodyStore GC reclaimed orphaned blobs"
);
} else {
info!(service, "BodyStore GC found no orphans");
}
}
Err(e) => {
warn!(service, error = %e, "BodyStore GC failed");
}
}
}