use std::sync::Arc;
use axum::extract::Extension;
use axum::Router;
use clap::Parser;
use md5::Digest;
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use fakecloud_core::delivery::DeliveryBus;
use fakecloud_core::dispatch::{self, DispatchConfig};
use fakecloud_core::registry::ServiceRegistry;
use fakecloud_sdk::types;
mod cli;
mod dynamodb_streams_lambda_poller;
mod introspection;
mod kinesis_lambda_poller;
mod lambda_delivery;
mod reset;
mod sqs_lambda_poller;
mod stepfunctions_delivery;
use cli::Cli;
use dynamodb_streams_lambda_poller::DynamoDbStreamsLambdaPoller;
use introspection::{
elasticache_cluster_response, elasticache_replication_group_response,
elasticache_serverless_cache_response, rds_instance_response,
};
use kinesis_lambda_poller::KinesisLambdaPoller;
use reset::ResetState;
use sqs_lambda_poller::SqsLambdaPoller;
use fakecloud_apigatewayv2::service::ApiGatewayV2Service;
use fakecloud_bedrock::service::BedrockService;
use fakecloud_cloudformation::service::CloudFormationService;
use fakecloud_cognito::service::CognitoService;
use fakecloud_dynamodb::service::DynamoDbService;
use fakecloud_elasticache::service::ElastiCacheService;
use fakecloud_eventbridge::service::EventBridgeService;
use fakecloud_iam::iam_service::IamService;
use fakecloud_iam::sts_service::StsService;
use fakecloud_kinesis::service::KinesisService;
use fakecloud_kms::service::KmsService;
use fakecloud_lambda::service::LambdaService;
use fakecloud_logs::service::LogsService;
use fakecloud_rds::service::RdsService;
use fakecloud_s3::service::S3Service;
use fakecloud_secretsmanager::service::SecretsManagerService;
use fakecloud_ses::service::SesV2Service;
use fakecloud_sns::service::SnsService;
use fakecloud_sqs::service::SqsService;
use fakecloud_ssm::service::SsmService;
use fakecloud_stepfunctions::service::StepFunctionsService;
#[tokio::main]
async fn main() {
let cli = Cli::parse();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_new(&cli.log_level)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.with_writer(std::io::stderr)
.init();
let persistence_config = match cli.persistence_config() {
Ok(cfg) => cfg,
Err(err) => fatal_exit(format_args!("invalid persistence configuration: {err}")),
};
if persistence_config.mode == fakecloud_persistence::StorageMode::Persistent {
if let Some(ref data_path) = persistence_config.data_path {
if let Err(err) = std::fs::create_dir_all(data_path) {
fatal_exit(format_args!(
"failed to create persistence data directory {}: {err}",
data_path.display()
));
}
if let Err(err) = fakecloud_persistence::version::ensure_version_file(
data_path,
env!("CARGO_PKG_VERSION"),
) {
fatal_exit(format_args!(
"persistence version file check failed at {}/fakecloud.version.toml: {err}",
data_path.display()
));
}
}
}
let endpoint_url = cli.endpoint_url();
let iam_state = Arc::new(parking_lot::RwLock::new(
fakecloud_iam::state::IamState::new(&cli.account_id),
));
let sqs_state = Arc::new(parking_lot::RwLock::new(
fakecloud_sqs::state::SqsState::new(&cli.account_id, &cli.region, &endpoint_url),
));
let sns_state = Arc::new(parking_lot::RwLock::new({
let mut s =
fakecloud_sns::state::SnsState::new(&cli.account_id, &cli.region, &endpoint_url);
s.seed_default_opted_out();
s
}));
let eb_state = Arc::new(parking_lot::RwLock::new(
fakecloud_eventbridge::state::EventBridgeState::new(&cli.account_id, &cli.region),
));
let ssm_state = Arc::new(parking_lot::RwLock::new(
fakecloud_ssm::state::SsmState::new(&cli.account_id, &cli.region),
));
let dynamodb_state = Arc::new(parking_lot::RwLock::new(
fakecloud_dynamodb::state::DynamoDbState::new(&cli.account_id, &cli.region),
));
let lambda_state = Arc::new(parking_lot::RwLock::new(
fakecloud_lambda::state::LambdaState::new(&cli.account_id, &cli.region),
));
let container_runtime = fakecloud_lambda::runtime::ContainerRuntime::new().map(Arc::new);
if let Some(ref rt) = container_runtime {
tracing::info!(
cli = rt.cli_name(),
"Lambda execution enabled via container runtime"
);
} else {
tracing::info!("Docker/Podman not available — Lambda Invoke will return errors for functions with code");
}
let secretsmanager_state = Arc::new(parking_lot::RwLock::new(
fakecloud_secretsmanager::state::SecretsManagerState::new(&cli.account_id, &cli.region),
));
let s3_state = Arc::new(parking_lot::RwLock::new(fakecloud_s3::state::S3State::new(
&cli.account_id,
&cli.region,
)));
let logs_state = Arc::new(parking_lot::RwLock::new(
fakecloud_logs::state::LogsState::new(&cli.account_id, &cli.region),
));
let kms_state = Arc::new(parking_lot::RwLock::new(
fakecloud_kms::state::KmsState::new(&cli.account_id, &cli.region),
));
let cloudformation_state = Arc::new(parking_lot::RwLock::new(
fakecloud_cloudformation::state::CloudFormationState::new(&cli.account_id, &cli.region),
));
let ses_state = Arc::new(parking_lot::RwLock::new(
fakecloud_ses::state::SesState::new(&cli.account_id, &cli.region),
));
let cognito_state = Arc::new(parking_lot::RwLock::new(
fakecloud_cognito::state::CognitoState::new(&cli.account_id, &cli.region),
));
let kinesis_state = Arc::new(parking_lot::RwLock::new(
fakecloud_kinesis::state::KinesisState::new(&cli.account_id, &cli.region),
));
let rds_state = Arc::new(parking_lot::RwLock::new(
fakecloud_rds::state::RdsState::new(&cli.account_id, &cli.region),
));
let elasticache_state = Arc::new(parking_lot::RwLock::new(
fakecloud_elasticache::state::ElastiCacheState::new(&cli.account_id, &cli.region),
));
let stepfunctions_state = Arc::new(parking_lot::RwLock::new(
fakecloud_stepfunctions::state::StepFunctionsState::new(&cli.account_id, &cli.region),
));
let apigatewayv2_state = Arc::new(parking_lot::RwLock::new(
fakecloud_apigatewayv2::state::ApiGatewayV2State::new(&cli.account_id, &cli.region),
));
let bedrock_state = Arc::new(parking_lot::RwLock::new(
fakecloud_bedrock::state::BedrockState::new(&cli.account_id, &cli.region),
));
let rds_runtime = fakecloud_rds::runtime::RdsRuntime::new().map(Arc::new);
if let Some(ref rt) = rds_runtime {
tracing::info!(
cli = rt.cli_name(),
"RDS execution enabled via container runtime"
);
} else {
tracing::info!("Docker/Podman not available — RDS CreateDBInstance will return errors");
}
let elasticache_runtime =
fakecloud_elasticache::runtime::ElastiCacheRuntime::new().map(Arc::new);
if let Some(ref rt) = elasticache_runtime {
tracing::info!(
cli = rt.cli_name(),
"ElastiCache execution enabled via container runtime"
);
} else {
tracing::info!(
"Docker/Podman not available — ElastiCache CreateReplicationGroup will return errors"
);
}
let sqs_delivery = Arc::new(fakecloud_sqs::delivery::SqsDeliveryImpl::new(
sqs_state.clone(),
));
let lambda_delivery: Option<Arc<dyn fakecloud_core::delivery::LambdaDelivery>> =
container_runtime.as_ref().map(|rt| {
Arc::new(lambda_delivery::LambdaDeliveryImpl::new(
lambda_state.clone(),
rt.clone(),
)) as Arc<dyn fakecloud_core::delivery::LambdaDelivery>
});
let delivery_for_sns = {
let mut bus = DeliveryBus::new().with_sqs(sqs_delivery.clone());
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
let sns_delivery = Arc::new(fakecloud_sns::delivery::SnsDeliveryImpl::new(
sns_state.clone(),
delivery_for_sns.clone(),
));
let kinesis_delivery_for_eb =
fakecloud_kinesis::delivery::KinesisDeliveryImpl::new(kinesis_state.clone());
let sfn_delivery_for_eb: Arc<dyn fakecloud_core::delivery::StepFunctionsDelivery> = {
let mut sns_fanout_for_sfn = DeliveryBus::new().with_sqs(sqs_delivery.clone());
if let Some(ref ld) = lambda_delivery {
sns_fanout_for_sfn = sns_fanout_for_sfn.with_lambda(ld.clone());
}
let sns_for_sfn_delivery = Arc::new(fakecloud_sns::delivery::SnsDeliveryImpl::new(
sns_state.clone(),
Arc::new(sns_fanout_for_sfn),
));
let eb_for_sfn_delivery = Arc::new(
fakecloud_eventbridge::delivery::EventBridgeDeliveryImpl::new(
eb_state.clone(),
Arc::new(DeliveryBus::new().with_sqs(sqs_delivery.clone())),
),
);
let mut sfn_interpreter_bus = DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_sns(sns_for_sfn_delivery)
.with_eventbridge(eb_for_sfn_delivery);
if let Some(ref ld) = lambda_delivery {
sfn_interpreter_bus = sfn_interpreter_bus.with_lambda(ld.clone());
}
Arc::new(stepfunctions_delivery::StepFunctionsDeliveryImpl::new(
stepfunctions_state.clone(),
Some(Arc::new(sfn_interpreter_bus)),
Some(dynamodb_state.clone()),
))
};
let delivery_for_eb = Arc::new(
DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_sns(sns_delivery.clone())
.with_kinesis(kinesis_delivery_for_eb)
.with_stepfunctions(sfn_delivery_for_eb),
);
let sns_delivery_for_ses = sns_delivery.clone();
let sns_delivery_for_cf = sns_delivery.clone();
let eb_delivery_for_s3 = Arc::new(
fakecloud_eventbridge::delivery::EventBridgeDeliveryImpl::new(
eb_state.clone(),
Arc::new(DeliveryBus::new().with_sqs(sqs_delivery.clone())),
),
);
let delivery_for_s3 = {
let mut bus = DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_sns(sns_delivery)
.with_eventbridge(eb_delivery_for_s3);
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
let sqs_delivery_for_ses = sqs_delivery.clone();
let kinesis_delivery =
fakecloud_kinesis::delivery::KinesisDeliveryImpl::new(kinesis_state.clone());
let kinesis_delivery_for_dynamodb =
fakecloud_kinesis::delivery::KinesisDeliveryImpl::new(kinesis_state.clone());
let mut delivery_for_logs = DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_kinesis(kinesis_delivery);
if let Some(ref ld) = lambda_delivery {
delivery_for_logs = delivery_for_logs.with_lambda(ld.clone());
}
let delivery_for_logs = Arc::new(delivery_for_logs);
let delivery_for_dynamodb =
Arc::new(DeliveryBus::new().with_kinesis(kinesis_delivery_for_dynamodb));
let lambda_invocations_state = lambda_state.clone();
let ses_emails_state = ses_state.clone();
let ses_inbound_state = ses_state.clone();
let sns_introspection_state = sns_state.clone();
let sqs_introspection_state = sqs_state.clone();
let eb_introspection_state = eb_state.clone();
let s3_introspection_state = s3_state.clone();
let rds_introspection_state = rds_state.clone();
let elasticache_introspection_state = elasticache_state.clone();
let dynamodb_ttl_state = dynamodb_state.clone();
let secretsmanager_rotation_state = secretsmanager_state.clone();
let sqs_sim_expiration_state = sqs_state.clone();
let sqs_sim_force_dlq_state = sqs_state.clone();
let eb_sim_state = eb_state.clone();
let eb_sim_delivery = delivery_for_eb.clone();
let eb_sim_lambda_state = Some(lambda_state.clone());
let eb_sim_logs_state = Some(logs_state.clone());
let eb_sim_container_runtime = container_runtime.clone();
let s3_sim_lifecycle_state = s3_state.clone();
let lambda_sim_warm_state = lambda_state.clone();
let lambda_sim_warm_runtime = container_runtime.clone();
let lambda_sim_evict_runtime = container_runtime.clone();
let sns_sim_pending_state = sns_state.clone();
let sns_sim_confirm_state = sns_state.clone();
let cognito_codes_state = cognito_state.clone();
let cognito_confirm_state = cognito_state.clone();
let cognito_tokens_state = cognito_state.clone();
let cognito_expire_state = cognito_state.clone();
let cognito_events_state = cognito_state.clone();
let reset_state = ResetState {
iam: iam_state.clone(),
sqs: sqs_state.clone(),
sns: sns_state.clone(),
eb: eb_state.clone(),
ssm: ssm_state.clone(),
dynamodb: dynamodb_state.clone(),
lambda: lambda_state.clone(),
secretsmanager: secretsmanager_state.clone(),
s3: s3_state.clone(),
logs: logs_state.clone(),
kms: kms_state.clone(),
cloudformation: cloudformation_state.clone(),
ses: ses_state.clone(),
cognito: cognito_state.clone(),
kinesis: kinesis_state.clone(),
rds: rds_state.clone(),
elasticache: elasticache_state.clone(),
stepfunctions: stepfunctions_state.clone(),
apigatewayv2: apigatewayv2_state.clone(),
bedrock: bedrock_state.clone(),
container_runtime: container_runtime.clone(),
rds_runtime: rds_runtime.clone(),
elasticache_runtime: elasticache_runtime.clone(),
};
let delivery_for_cf = {
let mut bus = DeliveryBus::new().with_sns(sns_delivery_for_cf);
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
if persistence_config.mode == fakecloud_persistence::StorageMode::Persistent {
for service in [
"cloudformation",
"sqs",
"sns",
"events",
"iam",
"sts",
"ssm",
"dynamodb",
"lambda",
"secretsmanager",
"logs",
"kms",
"ses",
"cognito-idp",
"kinesis",
"rds",
"elasticache",
"states",
"apigatewayv2",
"bedrock",
] {
fakecloud_persistence::warn_unsupported(service);
}
}
let mut registry = ServiceRegistry::new();
registry.register(Arc::new(CloudFormationService::new(
cloudformation_state,
fakecloud_cloudformation::service::CloudFormationDeps {
sqs: sqs_state.clone(),
sns: sns_state.clone(),
ssm: ssm_state.clone(),
iam: iam_state.clone(),
s3: s3_state.clone(),
eventbridge: eb_state.clone(),
dynamodb: dynamodb_state.clone(),
logs: logs_state.clone(),
delivery: delivery_for_cf,
},
)));
registry.register(Arc::new(SqsService::new(sqs_state.clone())));
let sns_state_for_sfn = sns_state.clone();
let delivery_for_sns_sfn = delivery_for_sns.clone();
registry.register(Arc::new(SnsService::new(sns_state, delivery_for_sns)));
let mut eb_service = EventBridgeService::new(eb_state.clone(), delivery_for_eb.clone())
.with_lambda(lambda_state.clone())
.with_logs(logs_state.clone());
if let Some(ref rt) = container_runtime {
eb_service = eb_service.with_runtime(rt.clone());
}
registry.register(Arc::new(eb_service));
let eb_state_for_ses = eb_state.clone();
let eb_state_for_sfn = eb_state.clone();
let mut scheduler = fakecloud_eventbridge::scheduler::Scheduler::new(eb_state, delivery_for_eb)
.with_lambda(lambda_state.clone())
.with_logs(logs_state.clone());
if let Some(ref rt) = container_runtime {
scheduler = scheduler.with_runtime(rt.clone());
}
tokio::spawn(scheduler.run());
registry.register(Arc::new(IamService::new(iam_state.clone())));
registry.register(Arc::new(StsService::new(iam_state.clone())));
registry.register(Arc::new(
SsmService::new(ssm_state).with_secretsmanager(secretsmanager_state.clone()),
));
let dynamodb_state_for_register = dynamodb_state.clone();
let delivery_for_dynamodb_register = delivery_for_dynamodb;
let mut lambda_service = LambdaService::new(lambda_state.clone());
if let Some(ref rt) = container_runtime {
lambda_service = lambda_service.with_runtime(rt.clone());
}
registry.register(Arc::new(lambda_service));
let delivery_for_secretsmanager = {
let mut bus = DeliveryBus::new();
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
let delivery_for_rotation_scheduler = delivery_for_secretsmanager.clone();
registry.register(Arc::new(
SecretsManagerService::new(secretsmanager_state).with_delivery(delivery_for_secretsmanager),
));
registry.register(Arc::new(LogsService::new(logs_state, delivery_for_logs)));
registry.register(Arc::new(KmsService::new(kms_state.clone())));
let mut shared_body_cache: Option<Arc<fakecloud_persistence::cache::BodyCache>> = None;
let s3_store: Arc<dyn fakecloud_persistence::S3Store> = match persistence_config.mode {
fakecloud_persistence::StorageMode::Persistent => {
let data_path = persistence_config
.data_path
.as_ref()
.expect("validated above")
.clone();
let s3_root = data_path.join("s3");
if let Err(err) = std::fs::create_dir_all(&s3_root) {
fatal_exit(format_args!(
"failed to create s3 persistence dir {}: {err}",
s3_root.display()
));
}
let cache = Arc::new(fakecloud_persistence::cache::BodyCache::new(
persistence_config.s3_cache_bytes,
));
shared_body_cache = Some(cache.clone());
let disk = fakecloud_persistence::s3::DiskS3Store::new(s3_root, cache);
match <fakecloud_persistence::s3::DiskS3Store as fakecloud_persistence::S3Store>::load(
&disk,
) {
Ok(snapshot) => {
let bucket_count = snapshot.buckets.len();
let object_count: usize =
snapshot.buckets.values().map(|b| b.objects.len()).sum();
let hydrated = match fakecloud_s3::persistence::hydrate_s3_state(
snapshot,
&cli.account_id,
&cli.region,
) {
Ok(h) => h,
Err(err) => fatal_exit(format_args!(
"failed to hydrate s3 persistence snapshot: {err}"
)),
};
*s3_state.write() = hydrated;
tracing::info!(
buckets = bucket_count,
objects = object_count,
"loaded s3 persistence snapshot",
);
}
Err(err) => fatal_exit(format_args!(
"failed to load s3 persistence snapshot: {err}"
)),
}
Arc::new(disk)
}
fakecloud_persistence::StorageMode::Memory => {
Arc::new(fakecloud_persistence::s3::MemoryS3Store::new())
}
};
let s3_store_for_inbound = s3_store.clone();
if let Some(ref cache) = shared_body_cache {
s3_state.write().set_body_cache(cache.clone());
}
registry.register(Arc::new(
S3Service::with_store(s3_state.clone(), delivery_for_s3, s3_store.clone())
.with_kms(kms_state),
));
registry.register(Arc::new(
DynamoDbService::new(dynamodb_state_for_register)
.with_s3(s3_state.clone())
.with_s3_store(s3_store.clone())
.with_delivery(delivery_for_dynamodb_register),
));
let eb_delivery_for_ses = Arc::new(
fakecloud_eventbridge::delivery::EventBridgeDeliveryImpl::new(
eb_state_for_ses,
Arc::new(DeliveryBus::new().with_sqs(sqs_delivery_for_ses)),
),
);
let delivery_for_ses = Arc::new(
DeliveryBus::new()
.with_sns(sns_delivery_for_ses)
.with_eventbridge(eb_delivery_for_ses),
);
let ses_delivery_ctx = fakecloud_ses::fanout::SesDeliveryContext {
ses_state: ses_state.clone(),
delivery_bus: delivery_for_ses,
};
registry.register(Arc::new(
SesV2Service::new(ses_state).with_delivery(ses_delivery_ctx),
));
let delivery_for_cognito = {
let mut bus = DeliveryBus::new();
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
let cognito_delivery_ctx = fakecloud_cognito::triggers::CognitoDeliveryContext {
delivery_bus: delivery_for_cognito,
};
registry.register(Arc::new(
CognitoService::new(cognito_state.clone()).with_delivery(cognito_delivery_ctx),
));
registry.register(Arc::new(KinesisService::new(kinesis_state.clone())));
let mut rds_service = RdsService::new(rds_state);
if let Some(ref rt) = rds_runtime {
rds_service = rds_service.with_runtime(rt.clone());
}
registry.register(Arc::new(rds_service));
let mut elasticache_service = ElastiCacheService::new(elasticache_state);
if let Some(ref rt) = elasticache_runtime {
elasticache_service = elasticache_service.with_runtime(rt.clone());
}
registry.register(Arc::new(elasticache_service));
let mut sfn_service = StepFunctionsService::new(stepfunctions_state.clone());
let sfn_delivery_bus = {
let mut sns_eb_bus = DeliveryBus::new().with_sqs(sqs_delivery.clone());
if let Some(ref ld) = lambda_delivery {
sns_eb_bus = sns_eb_bus.with_lambda(ld.clone());
}
let sns_delivery_for_sfn_eb = Arc::new(fakecloud_sns::delivery::SnsDeliveryImpl::new(
sns_state_for_sfn.clone(),
Arc::new(sns_eb_bus),
));
let mut eb_target_bus = DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_sns(sns_delivery_for_sfn_eb);
if let Some(ref ld) = lambda_delivery {
eb_target_bus = eb_target_bus.with_lambda(ld.clone());
}
let eb_delivery_for_sfn = Arc::new(
fakecloud_eventbridge::delivery::EventBridgeDeliveryImpl::new(
eb_state_for_sfn,
Arc::new(eb_target_bus),
),
);
let sns_delivery_for_sfn = Arc::new(fakecloud_sns::delivery::SnsDeliveryImpl::new(
sns_state_for_sfn,
delivery_for_sns_sfn,
));
let mut bus = DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_sns(sns_delivery_for_sfn)
.with_eventbridge(eb_delivery_for_sfn);
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
sfn_service = sfn_service
.with_delivery(sfn_delivery_bus.clone())
.with_dynamodb(dynamodb_state.clone());
registry.register(Arc::new(sfn_service));
let mut apigw_service = ApiGatewayV2Service::new(apigatewayv2_state.clone());
if let Some(ref ld) = lambda_delivery {
let delivery_for_apigw = Arc::new(DeliveryBus::new().with_lambda(ld.clone()));
apigw_service = apigw_service.with_delivery(delivery_for_apigw);
}
registry.register(Arc::new(apigw_service));
registry.register(Arc::new(BedrockService::new(bedrock_state.clone())));
let lifecycle_processor = fakecloud_s3::lifecycle::LifecycleProcessor::new(s3_state);
tokio::spawn(lifecycle_processor.run());
let mut sqs_lambda_poller = SqsLambdaPoller::new(sqs_state, lambda_state);
if let Some(ref ld) = lambda_delivery {
sqs_lambda_poller = sqs_lambda_poller.with_lambda_delivery(ld.clone());
}
tokio::spawn(sqs_lambda_poller.run());
let mut kinesis_lambda_poller =
KinesisLambdaPoller::new(kinesis_state, lambda_invocations_state.clone());
if let Some(ref ld) = lambda_delivery {
kinesis_lambda_poller = kinesis_lambda_poller.with_lambda_delivery(ld.clone());
}
tokio::spawn(kinesis_lambda_poller.run());
let mut dynamodb_streams_poller =
DynamoDbStreamsLambdaPoller::new(dynamodb_state.clone(), lambda_invocations_state.clone());
if let Some(ref ld) = lambda_delivery {
dynamodb_streams_poller = dynamodb_streams_poller.with_lambda_delivery(ld.clone());
}
tokio::spawn(Arc::new(dynamodb_streams_poller).run());
if let Some(ref rt) = container_runtime {
let rt = rt.clone();
tokio::spawn(rt.run_cleanup_loop(std::time::Duration::from_secs(300)));
}
let services: Vec<&str> = registry.service_names();
tracing::info!(services = ?services, "registered services");
let iam_mode = cli.iam_mode();
if iam_mode.is_enabled() || cli.verify_sigv4 {
tracing::warn!(
verify_sigv4 = cli.verify_sigv4,
iam_mode = %iam_mode,
"opt-in security features enabled: access keys with the `test` prefix bypass SigV4 verification and IAM enforcement — see /docs/reference/security"
);
}
if iam_mode.is_enabled() {
let (enforced, skipped) = registry.iam_enforcement_split();
tracing::info!(
enforced = ?enforced,
skipped = ?skipped,
"IAM enforcement surface: listed `enforced` services evaluate policies; `skipped` services are not yet wired for enforcement"
);
}
let config = DispatchConfig {
region: cli.region,
account_id: cli.account_id,
verify_sigv4: cli.verify_sigv4,
iam_mode,
credential_resolver: Some(
fakecloud_iam::credential_resolver::IamCredentialResolver::shared(iam_state.clone()),
),
policy_evaluator: Some(
fakecloud_iam::policy_evaluator::IamPolicyEvaluatorImpl::shared(iam_state.clone()),
),
};
let service_names: Vec<String> = registry
.service_names()
.iter()
.map(|s| s.to_string())
.collect();
let app = Router::new()
.route(
"/_fakecloud/health",
axum::routing::get({
let services = service_names.clone();
move || async move {
axum::Json(types::HealthResponse {
status: "ok".to_string(),
version: env!("CARGO_PKG_VERSION").to_string(),
services,
})
}
}),
)
.route(
"/_reset",
axum::routing::post({
let s = reset_state.clone();
move || async move { s.reset() }
}),
)
.route(
"/_fakecloud/lambda/invocations",
axum::routing::get({
let ls = lambda_invocations_state.clone();
move || async move {
let state = ls.read();
let invocations = state
.invocations
.iter()
.map(|inv| types::LambdaInvocation {
function_arn: inv.function_arn.clone(),
payload: inv.payload.clone(),
source: inv.source.clone(),
timestamp: inv.timestamp.to_rfc3339(),
})
.collect();
axum::Json(types::LambdaInvocationsResponse { invocations })
}
}),
)
.route(
"/_fakecloud/ses/emails",
axum::routing::get({
let ss = ses_emails_state.clone();
move || async move {
let state = ss.read();
let emails = state
.sent_emails
.iter()
.map(|email| types::SentEmail {
message_id: email.message_id.clone(),
from: email.from.clone(),
to: email.to.clone(),
cc: email.cc.clone(),
bcc: email.bcc.clone(),
subject: email.subject.clone(),
html_body: email.html_body.clone(),
text_body: email.text_body.clone(),
raw_data: email.raw_data.clone(),
template_name: email.template_name.clone(),
template_data: email.template_data.clone(),
timestamp: email.timestamp.to_rfc3339(),
})
.collect();
axum::Json(types::SesEmailsResponse { emails })
}
}),
)
.route(
"/_fakecloud/ses/inbound",
axum::routing::post({
let ss = ses_inbound_state.clone();
let s3_for_inbound = s3_introspection_state.clone();
let s3_store_for_inbound = s3_store_for_inbound.clone();
let delivery_for_inbound = {
let mut bus = DeliveryBus::new();
let sns_fanout_bus = {
let mut b = DeliveryBus::new().with_sqs(sqs_delivery.clone());
if let Some(ref ld) = lambda_delivery {
b = b.with_lambda(ld.clone());
}
Arc::new(b)
};
let sns_for_inbound = Arc::new(
fakecloud_sns::delivery::SnsDeliveryImpl::new(
sns_introspection_state.clone(),
sns_fanout_bus,
),
);
bus = bus.with_sns(sns_for_inbound);
if let Some(ref ld) = lambda_delivery {
bus = bus.with_lambda(ld.clone());
}
Arc::new(bus)
};
move |axum::Json(body): axum::Json<types::InboundEmailRequest>| async move {
let (message_id, matched_rules, actions) =
fakecloud_ses::v1::evaluate_inbound_email(
&ss,
&body.from,
&body.to,
&body.subject,
&body.body,
);
for (_rule, action) in &actions {
match action {
fakecloud_ses::state::ReceiptAction::S3 {
bucket_name,
object_key_prefix,
..
} => {
let prefix = object_key_prefix.as_deref().unwrap_or("");
let key = format!("{prefix}{message_id}");
let now = chrono::Utc::now();
let data = bytes::Bytes::from(body.body.clone());
let size = data.len() as u64;
let etag = format!("\"{:x}\"", md5::Md5::digest(&data));
let obj = fakecloud_s3::state::S3Object {
key: key.clone(),
body: fakecloud_persistence::BodyRef::Memory(data.clone()),
content_type: "text/plain".to_string(),
etag: etag.clone(),
size,
last_modified: now,
storage_class: "STANDARD".to_string(),
..Default::default()
};
let mut state = s3_for_inbound.write();
if let Some(bucket) = state.buckets.get_mut(bucket_name) {
tracing::info!(
bucket = %bucket_name,
key = %key,
"SES inbound: stored email in S3"
);
let meta =
fakecloud_s3::persistence::object_meta_snapshot(&obj);
bucket.objects.insert(key.clone(), obj);
drop(state);
if let Err(err) = s3_store_for_inbound.put_object(
bucket_name,
&key,
None,
fakecloud_persistence::BodySource::Bytes(data),
&meta,
) {
tracing::error!(
bucket = %bucket_name,
key = %key,
error = %err,
"SES inbound: failed to persist S3 object via store"
);
}
} else {
tracing::warn!(
bucket = %bucket_name,
"SES inbound: S3 bucket not found, skipping S3 action"
);
}
}
fakecloud_ses::state::ReceiptAction::Sns { topic_arn, .. } => {
let notification = serde_json::json!({
"notificationType": "Received",
"mail": {
"messageId": message_id,
"source": body.from,
"destination": body.to,
"commonHeaders": {
"from": [&body.from],
"to": &body.to,
"subject": &body.subject,
}
},
"content": &body.body,
});
tracing::info!(
topic_arn = %topic_arn,
"SES inbound: publishing to SNS"
);
delivery_for_inbound.publish_to_sns(
topic_arn,
¬ification.to_string(),
Some(&body.subject),
);
}
fakecloud_ses::state::ReceiptAction::Lambda {
function_arn,
invocation_type,
..
} => {
let ses_event = serde_json::json!({
"Records": [{
"eventSource": "aws:ses",
"eventVersion": "1.0",
"ses": {
"mail": {
"messageId": message_id,
"source": body.from,
"destination": body.to,
"commonHeaders": {
"from": [&body.from],
"to": &body.to,
"subject": &body.subject,
}
},
"receipt": {
"recipients": &body.to,
"action": {
"type": "Lambda",
"functionArn": function_arn,
"invocationType": invocation_type.as_deref().unwrap_or("Event"),
}
}
}
}]
});
let payload = ses_event.to_string();
let delivery = delivery_for_inbound.clone();
let function_arn = function_arn.clone();
tracing::info!(
function_arn = %function_arn,
"SES inbound: invoking Lambda"
);
tokio::spawn(async move {
match delivery.invoke_lambda(&function_arn, &payload).await {
Some(Ok(_)) => {
tracing::info!(
function_arn = %function_arn,
"SES inbound: Lambda invocation succeeded"
);
}
Some(Err(e)) => {
tracing::error!(
function_arn = %function_arn,
error = %e,
"SES inbound: Lambda invocation failed"
);
}
None => {
tracing::warn!(
"SES inbound: no container runtime available for Lambda invocation"
);
}
}
});
}
_ => {}
}
}
let actions_executed = actions
.iter()
.map(|(rule, action)| types::InboundActionExecuted {
rule: rule.clone(),
action_type: match action {
fakecloud_ses::state::ReceiptAction::S3 { .. } => "S3",
fakecloud_ses::state::ReceiptAction::Sns { .. } => "SNS",
fakecloud_ses::state::ReceiptAction::Lambda { .. } => "Lambda",
fakecloud_ses::state::ReceiptAction::Bounce { .. } => "Bounce",
fakecloud_ses::state::ReceiptAction::AddHeader { .. } => {
"AddHeader"
}
fakecloud_ses::state::ReceiptAction::Stop { .. } => "Stop",
}
.to_string(),
})
.collect();
axum::Json(types::InboundEmailResponse {
message_id,
matched_rules,
actions_executed,
})
}
}),
)
.route(
"/_fakecloud/sns/messages",
axum::routing::get({
let ss = sns_introspection_state;
move || async move {
let state = ss.read();
let messages = state
.published
.iter()
.map(|msg| types::SnsMessage {
message_id: msg.message_id.clone(),
topic_arn: msg.topic_arn.clone(),
message: msg.message.clone(),
subject: msg.subject.clone(),
timestamp: msg.timestamp.to_rfc3339(),
})
.collect();
axum::Json(types::SnsMessagesResponse { messages })
}
}),
)
.route(
"/_fakecloud/sqs/messages",
axum::routing::get({
let ss = sqs_introspection_state;
move || async move {
let state = ss.read();
let queues = state
.queues
.values()
.map(|queue| {
let mut messages: Vec<types::SqsMessageInfo> = queue
.messages
.iter()
.map(|msg| types::SqsMessageInfo {
message_id: msg.message_id.clone(),
body: msg.body.clone(),
receive_count: msg.receive_count as u64,
in_flight: false,
created_at: msg.created_at.to_rfc3339(),
})
.collect();
let inflight: Vec<types::SqsMessageInfo> = queue
.inflight
.iter()
.map(|msg| types::SqsMessageInfo {
message_id: msg.message_id.clone(),
body: msg.body.clone(),
receive_count: msg.receive_count as u64,
in_flight: true,
created_at: msg.created_at.to_rfc3339(),
})
.collect();
messages.extend(inflight);
types::SqsQueueMessages {
queue_url: queue.queue_url.clone(),
queue_name: queue.queue_name.clone(),
messages,
}
})
.collect();
axum::Json(types::SqsMessagesResponse { queues })
}
}),
)
.route(
"/_fakecloud/events/history",
axum::routing::get({
let es = eb_introspection_state;
move || async move {
let state = es.read();
let events = state
.events
.iter()
.map(|evt| types::EventBridgeEvent {
event_id: evt.event_id.clone(),
source: evt.source.clone(),
detail_type: evt.detail_type.clone(),
detail: evt.detail.clone(),
bus_name: evt.event_bus_name.clone(),
timestamp: evt.time.to_rfc3339(),
})
.collect();
let lambda = state
.lambda_invocations
.iter()
.map(|inv| types::EventBridgeLambdaDelivery {
function_arn: inv.function_arn.clone(),
payload: inv.payload.clone(),
timestamp: inv.timestamp.to_rfc3339(),
})
.collect();
let logs = state
.log_deliveries
.iter()
.map(|ld| types::EventBridgeLogDelivery {
log_group_arn: ld.log_group_arn.clone(),
payload: ld.payload.clone(),
timestamp: ld.timestamp.to_rfc3339(),
})
.collect();
axum::Json(types::EventHistoryResponse {
events,
deliveries: types::EventBridgeDeliveries { lambda, logs },
})
}
}),
)
.route(
"/_fakecloud/sqs/expiration-processor/tick",
axum::routing::post({
let ss = sqs_sim_expiration_state;
move || async move {
let expired = fakecloud_sqs::simulation::tick_expiration(&ss);
axum::Json(types::ExpirationTickResponse {
expired_messages: expired,
})
}
}),
)
.route(
"/_fakecloud/sqs/{queue_name}/force-dlq",
axum::routing::post({
let ss = sqs_sim_force_dlq_state;
move |axum::extract::Path(queue_name): axum::extract::Path<String>| async move {
let moved = fakecloud_sqs::simulation::force_dlq(&ss, &queue_name);
axum::Json(types::ForceDlqResponse {
moved_messages: moved,
})
}
}),
)
.route(
"/_fakecloud/events/fire-rule",
axum::routing::post({
let es = eb_sim_state;
let delivery = eb_sim_delivery;
let lambda_state = eb_sim_lambda_state;
let logs_state = eb_sim_logs_state;
let container_runtime = eb_sim_container_runtime;
move |axum::Json(body): axum::Json<types::FireRuleRequest>| async move {
let bus_name = body.bus_name.as_deref().unwrap_or("default");
let ctx = fakecloud_eventbridge::simulation::FireRuleContext {
state: &es,
delivery: &delivery,
lambda_state: &lambda_state,
logs_state: &logs_state,
container_runtime: &container_runtime,
};
match fakecloud_eventbridge::simulation::fire_rule(
&ctx,
bus_name,
&body.rule_name,
) {
Ok(targets) => {
let target_list = targets
.iter()
.map(|t| types::FireRuleTarget {
target_type: t.target_type.clone(),
arn: t.arn.clone(),
})
.collect();
(
axum::http::StatusCode::OK,
axum::Json(serde_json::json!(types::FireRuleResponse {
targets: target_list
})),
)
}
Err(msg) => (
axum::http::StatusCode::NOT_FOUND,
axum::Json(serde_json::json!({ "error": msg })),
),
}
}
}),
)
.route(
"/_fakecloud/s3/notifications",
axum::routing::get({
let ss = s3_introspection_state;
move || async move {
let state = ss.read();
let notifications = state
.notification_events
.iter()
.map(|evt| types::S3Notification {
bucket: evt.bucket.clone(),
key: evt.key.clone(),
event_type: evt.event_type.clone(),
timestamp: evt.timestamp.to_rfc3339(),
})
.collect();
axum::Json(types::S3NotificationsResponse { notifications })
}
}),
)
.route(
"/_fakecloud/dynamodb/ttl-processor/tick",
axum::routing::post({
let ds = dynamodb_ttl_state;
move || async move {
let count = fakecloud_dynamodb::ttl::process_ttl_expirations(&ds);
axum::Json(types::TtlTickResponse {
expired_items: count as u64,
})
}
}),
)
.route(
"/_fakecloud/secretsmanager/rotation-scheduler/tick",
axum::routing::post({
let ss = secretsmanager_rotation_state;
let bus = delivery_for_rotation_scheduler;
move || async move {
let rotated =
fakecloud_secretsmanager::rotation::check_and_rotate(&ss, Some(&bus)).await;
axum::Json(types::RotationTickResponse {
rotated_secrets: rotated,
})
}
}),
)
.route(
"/_fakecloud/cognito/confirmation-codes/{pool_id}/{username}",
axum::routing::get({
let cs = cognito_state.clone();
move |axum::extract::Path((pool_id, username)): axum::extract::Path<(
String,
String,
)>| {
let cs = cs.clone();
async move {
let state = cs.read();
let user = state
.users
.get(&pool_id)
.and_then(|users| users.get(&username));
let code = user.and_then(|u| u.confirmation_code.clone());
let attr_codes = user
.map(|u| serde_json::json!(u.attribute_verification_codes))
.unwrap_or(serde_json::json!({}));
axum::Json(types::UserConfirmationCodes {
confirmation_code: code,
attribute_verification_codes: attr_codes,
})
}
}
}),
)
.route(
"/_fakecloud/cognito/confirmation-codes",
axum::routing::get({
let cs = cognito_codes_state;
move || {
let cs = cs.clone();
async move {
let state = cs.read();
let mut codes = Vec::new();
for (pool_id, users) in &state.users {
for (username, user) in users {
if let Some(code) = &user.confirmation_code {
codes.push(types::ConfirmationCode {
pool_id: pool_id.clone(),
username: username.clone(),
code: code.clone(),
code_type: "signup".to_string(),
attribute: None,
});
}
for (attr, code) in &user.attribute_verification_codes {
codes.push(types::ConfirmationCode {
pool_id: pool_id.clone(),
username: username.clone(),
code: code.clone(),
code_type: "attribute_verification".to_string(),
attribute: Some(attr.clone()),
});
}
}
}
axum::Json(types::ConfirmationCodesResponse { codes })
}
}
}),
)
.route(
"/_fakecloud/cognito/confirm-user",
axum::routing::post({
let cs = cognito_confirm_state;
move |axum::Json(body): axum::Json<types::ConfirmUserRequest>| {
let cs = cs.clone();
async move {
let mut state = cs.write();
let user = state
.users
.get_mut(&body.user_pool_id)
.and_then(|users| users.get_mut(&body.username));
match user {
Some(user) => {
user.user_status = "CONFIRMED".to_string();
user.confirmation_code = None;
user.user_last_modified_date = chrono::Utc::now();
(
axum::http::StatusCode::OK,
axum::Json(serde_json::json!(types::ConfirmUserResponse {
confirmed: true,
error: None,
})),
)
}
None => (
axum::http::StatusCode::NOT_FOUND,
axum::Json(serde_json::json!(types::ConfirmUserResponse {
confirmed: false,
error: Some("User not found".to_string()),
})),
),
}
}
}
}),
)
.route(
"/_fakecloud/cognito/tokens",
axum::routing::get({
let cs = cognito_tokens_state;
move || {
let cs = cs.clone();
async move {
let state = cs.read();
let mut tokens = Vec::new();
for data in state.access_tokens.values() {
tokens.push(types::TokenInfo {
token_type: "access".to_string(),
username: data.username.clone(),
pool_id: data.user_pool_id.clone(),
client_id: data.client_id.clone(),
issued_at: data.issued_at.timestamp() as f64,
});
}
for data in state.refresh_tokens.values() {
tokens.push(types::TokenInfo {
token_type: "refresh".to_string(),
username: data.username.clone(),
pool_id: data.user_pool_id.clone(),
client_id: data.client_id.clone(),
issued_at: data.issued_at.timestamp() as f64,
});
}
axum::Json(types::TokensResponse { tokens })
}
}
}),
)
.route(
"/_fakecloud/cognito/expire-tokens",
axum::routing::post({
let cs = cognito_expire_state;
move |axum::Json(body): axum::Json<types::ExpireTokensRequest>| {
let cs = cs.clone();
async move {
let mut state = cs.write();
let mut expired = 0usize;
let matches = |p: &str, u: &str| -> bool {
body.user_pool_id.as_ref().is_none_or(|pid| pid == p)
&& body.username.as_ref().is_none_or(|un| un == u)
};
let before_access = state.access_tokens.len();
state
.access_tokens
.retain(|_, v| !matches(&v.user_pool_id, &v.username));
expired += before_access - state.access_tokens.len();
let before_refresh = state.refresh_tokens.len();
state
.refresh_tokens
.retain(|_, v| !matches(&v.user_pool_id, &v.username));
expired += before_refresh - state.refresh_tokens.len();
let before_sessions = state.sessions.len();
state
.sessions
.retain(|_, v| !matches(&v.user_pool_id, &v.username));
expired += before_sessions - state.sessions.len();
axum::Json(types::ExpireTokensResponse {
expired_tokens: expired as u64,
})
}
}
}),
)
.route(
"/_fakecloud/cognito/auth-events",
axum::routing::get({
let cs = cognito_events_state;
move || {
let cs = cs.clone();
async move {
let state = cs.read();
let events = state
.auth_events
.iter()
.map(|e| types::AuthEvent {
event_type: e.event_type.clone(),
username: e.username.clone(),
user_pool_id: e.user_pool_id.clone(),
client_id: e.client_id.clone(),
timestamp: e.timestamp.timestamp() as f64,
success: e.success,
})
.collect();
axum::Json(types::AuthEventsResponse { events })
}
}
}),
)
.route(
"/_fakecloud/s3/lifecycle-processor/tick",
axum::routing::post({
let ss = s3_sim_lifecycle_state;
move || async move {
let result = fakecloud_s3::simulation::tick_lifecycle(&ss);
axum::Json(types::LifecycleTickResponse {
processed_buckets: result.processed_buckets,
expired_objects: result.expired_objects,
transitioned_objects: result.transitioned_objects,
})
}
}),
)
.route(
"/_fakecloud/lambda/warm-containers",
axum::routing::get({
let ls = lambda_sim_warm_state;
let rt = lambda_sim_warm_runtime;
move || async move {
let containers: Vec<serde_json::Value> = if let Some(ref rt) = rt {
rt.list_warm_containers(&ls)
} else {
Vec::new()
};
let containers: Vec<types::WarmContainer> = containers
.into_iter()
.filter_map(|v| serde_json::from_value(v).ok())
.collect();
axum::Json(types::WarmContainersResponse { containers })
}
}),
)
.route(
"/_fakecloud/rds/instances",
axum::routing::get({
let rs = rds_introspection_state;
move || {
let rs = rs.clone();
async move {
let state = rs.read();
let mut instances: Vec<types::RdsInstance> = state
.instances
.values()
.map(rds_instance_response)
.collect();
instances.sort_by(|a, b| {
a.db_instance_identifier.cmp(&b.db_instance_identifier)
});
axum::Json(types::RdsInstancesResponse { instances })
}
}
}),
)
.route(
"/_fakecloud/elasticache/clusters",
axum::routing::get({
let ec = elasticache_introspection_state.clone();
move || {
let ec = ec.clone();
async move {
let state = ec.read();
let mut clusters: Vec<types::ElastiCacheCluster> = state
.cache_clusters
.values()
.map(elasticache_cluster_response)
.collect();
clusters.sort_by(|a, b| a.cache_cluster_id.cmp(&b.cache_cluster_id));
axum::Json(types::ElastiCacheClustersResponse { clusters })
}
}
}),
)
.route(
"/_fakecloud/elasticache/replication-groups",
axum::routing::get({
let ec = elasticache_introspection_state.clone();
move || {
let ec = ec.clone();
async move {
let state = ec.read();
let mut replication_groups: Vec<
types::ElastiCacheReplicationGroupIntrospection,
> = state
.replication_groups
.values()
.map(elasticache_replication_group_response)
.collect();
replication_groups
.sort_by(|a, b| a.replication_group_id.cmp(&b.replication_group_id));
axum::Json(types::ElastiCacheReplicationGroupsResponse {
replication_groups,
})
}
}
}),
)
.route(
"/_fakecloud/elasticache/serverless-caches",
axum::routing::get({
let ec = elasticache_introspection_state;
move || {
let ec = ec.clone();
async move {
let state = ec.read();
let mut serverless_caches: Vec<
types::ElastiCacheServerlessCacheIntrospection,
> = state
.serverless_caches
.values()
.map(elasticache_serverless_cache_response)
.collect();
serverless_caches
.sort_by(|a, b| a.serverless_cache_name.cmp(&b.serverless_cache_name));
axum::Json(types::ElastiCacheServerlessCachesResponse { serverless_caches })
}
}
}),
)
.route(
"/_fakecloud/stepfunctions/executions",
axum::routing::get({
let ss = stepfunctions_state.clone();
move || {
let ss = ss.clone();
async move {
let state = ss.read();
let mut executions: Vec<types::StepFunctionsExecution> = state
.executions
.values()
.map(|exec| types::StepFunctionsExecution {
execution_arn: exec.execution_arn.clone(),
state_machine_arn: exec.state_machine_arn.clone(),
name: exec.name.clone(),
status: exec.status.as_str().to_string(),
input: exec.input.clone(),
output: exec.output.clone(),
start_date: exec.start_date.to_rfc3339(),
stop_date: exec.stop_date.map(|d| d.to_rfc3339()),
})
.collect();
executions.sort_by(|a, b| b.start_date.cmp(&a.start_date));
axum::Json(types::StepFunctionsExecutionsResponse { executions })
}
}
}),
)
.route(
"/_fakecloud/apigatewayv2/requests",
axum::routing::get({
let apigw_state = apigatewayv2_state.clone();
move || {
let apigw_state = apigw_state.clone();
async move {
let state = apigw_state.read();
axum::Json(serde_json::json!({
"requests": state.request_history
}))
}
}
}),
)
.route(
"/_fakecloud/lambda/{function_name}/evict-container",
axum::routing::post({
let rt = lambda_sim_evict_runtime;
move |axum::extract::Path(function_name): axum::extract::Path<String>| async move {
let evicted = if let Some(ref rt) = rt {
rt.evict_container(&function_name).await
} else {
false
};
axum::Json(types::EvictContainerResponse { evicted })
}
}),
)
.route(
"/_fakecloud/sns/pending-confirmations",
axum::routing::get({
let ss = sns_sim_pending_state;
move || async move {
let pending = fakecloud_sns::simulation::list_pending_confirmations(&ss);
let pending_confirmations = pending
.into_iter()
.map(|p| types::PendingConfirmation {
subscription_arn: p.subscription_arn,
topic_arn: p.topic_arn,
protocol: p.protocol,
endpoint: p.endpoint,
token: p.token,
})
.collect();
axum::Json(types::PendingConfirmationsResponse {
pending_confirmations,
})
}
}),
)
.route(
"/_fakecloud/sns/confirm-subscription",
axum::routing::post({
let ss = sns_sim_confirm_state;
move |axum::Json(body): axum::Json<types::ConfirmSubscriptionRequest>| async move {
let confirmed = fakecloud_sns::simulation::confirm_subscription(
&ss,
&body.subscription_arn,
);
axum::Json(types::ConfirmSubscriptionResponse { confirmed })
}
}),
)
.route(
"/_fakecloud/reset/{service}",
axum::routing::post({
let s = reset_state.clone();
move |axum::extract::Path(service): axum::extract::Path<String>| async move {
match s.reset_service(&service) {
Ok(()) => (
axum::http::StatusCode::OK,
axum::Json(serde_json::json!(types::ResetServiceResponse {
reset: service
})),
),
Err(msg) => (
axum::http::StatusCode::NOT_FOUND,
axum::Json(serde_json::json!({ "error": msg })),
),
}
}
}),
)
.route(
"/_fakecloud/bedrock/invocations",
axum::routing::get({
let bs = bedrock_state.clone();
move || async move {
let state = bs.read();
let invocations: Vec<serde_json::Value> = state
.invocations
.iter()
.map(|inv| {
serde_json::json!({
"modelId": inv.model_id,
"input": inv.input,
"output": inv.output,
"timestamp": inv.timestamp.to_rfc3339(),
"error": inv.error,
})
})
.collect();
axum::Json(serde_json::json!({ "invocations": invocations }))
}
}),
)
.route(
"/_fakecloud/bedrock/models/{model_id}/response",
axum::routing::post({
let bs = bedrock_state.clone();
move |axum::extract::Path(model_id): axum::extract::Path<String>,
body: String| async move {
let mut state = bs.write();
state.custom_responses.insert(model_id.clone(), body);
axum::Json(
serde_json::json!({ "status": "ok", "modelId": model_id }),
)
}
}),
)
.route(
"/_fakecloud/bedrock/models/{model_id}/responses",
axum::routing::post({
let bs = bedrock_state.clone();
move |axum::extract::Path(model_id): axum::extract::Path<String>,
axum::Json(body): axum::Json<serde_json::Value>| async move {
let rules_json = body.get("rules").and_then(|r| r.as_array()).cloned();
let Some(rules_json) = rules_json else {
return (
axum::http::StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "body must contain a `rules` array"
})),
);
};
let mut parsed = Vec::with_capacity(rules_json.len());
for rule in rules_json {
let prompt_contains = match rule.get("promptContains") {
None | Some(serde_json::Value::Null) => None,
Some(serde_json::Value::String(s)) => Some(s.clone()),
Some(_) => {
return (
axum::http::StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "`promptContains` must be a string when provided"
})),
);
}
};
let response = match rule.get("response") {
Some(serde_json::Value::String(s)) => s.clone(),
Some(other) => other.to_string(),
None => {
return (
axum::http::StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "each rule must include a `response` field"
})),
);
}
};
parsed.push(fakecloud_bedrock::state::ResponseRule {
prompt_contains,
response,
});
}
let mut state = bs.write();
state.response_rules.insert(model_id.clone(), parsed);
(
axum::http::StatusCode::OK,
axum::Json(serde_json::json!({
"status": "ok",
"modelId": model_id
})),
)
}
})
.delete({
let bs = bedrock_state.clone();
move |axum::extract::Path(model_id): axum::extract::Path<String>| async move {
let mut state = bs.write();
state.response_rules.remove(&model_id);
axum::Json(serde_json::json!({ "status": "ok", "modelId": model_id }))
}
}),
)
.route(
"/_fakecloud/bedrock/faults",
axum::routing::post({
let bs = bedrock_state.clone();
move |axum::Json(body): axum::Json<serde_json::Value>| async move {
let error_type = body
.get("errorType")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let message = body
.get("message")
.and_then(|v| v.as_str())
.unwrap_or("")
.to_string();
let http_status_raw =
body.get("httpStatus").and_then(|v| v.as_u64()).unwrap_or(500);
let Ok(http_status) = u16::try_from(http_status_raw) else {
return (
axum::http::StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "`httpStatus` must fit in a u16"
})),
);
};
let count_raw = body.get("count").and_then(|v| v.as_u64()).unwrap_or(1);
let Ok(count) = u32::try_from(count_raw.max(1)) else {
return (
axum::http::StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "`count` must fit in a u32"
})),
);
};
let model_id = body
.get("modelId")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let operation = body
.get("operation")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
if error_type.is_empty() {
return (
axum::http::StatusCode::BAD_REQUEST,
axum::Json(serde_json::json!({
"error": "`errorType` is required"
})),
);
}
let mut state = bs.write();
state
.fault_rules
.push(fakecloud_bedrock::state::FaultRule {
error_type,
message,
http_status,
remaining: count,
model_id,
operation,
});
(
axum::http::StatusCode::OK,
axum::Json(serde_json::json!({ "status": "ok" })),
)
}
})
.get({
let bs = bedrock_state.clone();
move || async move {
let state = bs.read();
let faults: Vec<serde_json::Value> = state
.fault_rules
.iter()
.map(|f| {
serde_json::json!({
"errorType": f.error_type,
"message": f.message,
"httpStatus": f.http_status,
"remaining": f.remaining,
"modelId": f.model_id,
"operation": f.operation,
})
})
.collect();
axum::Json(serde_json::json!({ "faults": faults }))
}
})
.delete({
let bs = bedrock_state.clone();
move || async move {
let mut state = bs.write();
state.fault_rules.clear();
axum::Json(serde_json::json!({ "status": "ok" }))
}
}),
)
.fallback(dispatch::dispatch)
.layer(Extension(Arc::new(registry)))
.layer(Extension(Arc::new(config)))
.layer(TraceLayer::new_for_http());
let listener = TcpListener::bind(&cli.addr).await.unwrap();
tracing::info!(addr = %cli.addr, "fakecloud is ready");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
if let Some(rt) = container_runtime {
rt.stop_all().await;
}
if let Some(rt) = rds_runtime {
rt.stop_all().await;
}
if let Some(rt) = elasticache_runtime {
rt.stop_all().await;
}
}
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
tracing::info!("shutting down");
}
fn fatal_exit(args: std::fmt::Arguments<'_>) -> ! {
use std::io::Write;
tracing::error!("{args}");
let _ = std::io::stderr().flush();
std::process::exit(1);
}