use std::sync::Arc;
use axum::extract::Extension;
use axum::Router;
use clap::Parser;
use tokio::net::TcpListener;
use tower_http::trace::TraceLayer;
use fakecloud_core::delivery::DeliveryBus;
use fakecloud_core::dispatch::{self, DispatchConfig};
use fakecloud_core::registry::ServiceRegistry;
use fakecloud_eventbridge::service::EventBridgeService;
use fakecloud_iam::iam_service::IamService;
use fakecloud_iam::sts_service::StsService;
use fakecloud_s3::service::S3Service;
use fakecloud_sns::service::SnsService;
use fakecloud_sqs::service::SqsService;
use fakecloud_ssm::service::SsmService;
#[derive(Parser)]
#[command(name = "fakecloud")]
#[command(about = "FakeCloud — local AWS cloud emulator")]
#[command(version)]
struct Cli {
#[arg(long, default_value = "0.0.0.0:4566", env = "FAKECLOUD_ADDR")]
addr: String,
#[arg(long, default_value = "us-east-1", env = "FAKECLOUD_REGION")]
region: String,
#[arg(long, default_value = "123456789012", env = "FAKECLOUD_ACCOUNT_ID")]
account_id: String,
#[arg(long, default_value = "info", env = "FAKECLOUD_LOG")]
log_level: String,
}
#[tokio::main]
async fn main() {
let cli = Cli::parse();
tracing_subscriber::fmt()
.with_env_filter(
tracing_subscriber::EnvFilter::try_new(&cli.log_level)
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info")),
)
.init();
let iam_state = Arc::new(parking_lot::RwLock::new(
fakecloud_iam::state::IamState::new(&cli.account_id),
));
let sqs_state = Arc::new(parking_lot::RwLock::new(
fakecloud_sqs::state::SqsState::new(&cli.account_id, &cli.region, "http://localhost:4566"),
));
let sns_state = Arc::new(parking_lot::RwLock::new({
let mut s = fakecloud_sns::state::SnsState::new(&cli.account_id, &cli.region);
s.seed_default_opted_out();
s
}));
let eb_state = Arc::new(parking_lot::RwLock::new(
fakecloud_eventbridge::state::EventBridgeState::new(&cli.account_id, &cli.region),
));
let ssm_state = Arc::new(parking_lot::RwLock::new(
fakecloud_ssm::state::SsmState::new(&cli.account_id, &cli.region),
));
let s3_state = Arc::new(parking_lot::RwLock::new(fakecloud_s3::state::S3State::new(
&cli.account_id,
&cli.region,
)));
let sqs_delivery = Arc::new(fakecloud_sqs::delivery::SqsDeliveryImpl::new(
sqs_state.clone(),
));
let delivery_for_sns = Arc::new(DeliveryBus::new().with_sqs(sqs_delivery.clone()));
let sns_delivery = Arc::new(fakecloud_sns::delivery::SnsDeliveryImpl::new(
sns_state.clone(),
delivery_for_sns.clone(),
));
let delivery_for_eb = Arc::new(
DeliveryBus::new()
.with_sqs(sqs_delivery.clone())
.with_sns(sns_delivery.clone()),
);
let delivery_for_s3 = Arc::new(
DeliveryBus::new()
.with_sqs(sqs_delivery)
.with_sns(sns_delivery),
);
let reset_state = ResetState {
iam: iam_state.clone(),
sqs: sqs_state.clone(),
sns: sns_state.clone(),
eb: eb_state.clone(),
ssm: ssm_state.clone(),
s3: s3_state.clone(),
};
let mut registry = ServiceRegistry::new();
registry.register(Arc::new(SqsService::new(sqs_state)));
registry.register(Arc::new(SnsService::new(sns_state, delivery_for_sns)));
registry.register(Arc::new(EventBridgeService::new(
eb_state.clone(),
delivery_for_eb.clone(),
)));
let scheduler = fakecloud_eventbridge::scheduler::Scheduler::new(eb_state, delivery_for_eb);
tokio::spawn(scheduler.run());
registry.register(Arc::new(IamService::new(iam_state.clone())));
registry.register(Arc::new(StsService::new(iam_state)));
registry.register(Arc::new(SsmService::new(ssm_state)));
registry.register(Arc::new(S3Service::new(s3_state.clone(), delivery_for_s3)));
let lifecycle_processor = fakecloud_s3::lifecycle::LifecycleProcessor::new(s3_state);
tokio::spawn(lifecycle_processor.run());
let services: Vec<&str> = registry.service_names();
tracing::info!(services = ?services, "registered services");
let config = DispatchConfig {
region: cli.region,
account_id: cli.account_id,
};
let service_names: Vec<String> = registry
.service_names()
.iter()
.map(|s| s.to_string())
.collect();
let app = Router::new()
.route(
"/_fakecloud/health",
axum::routing::get({
let services = service_names.clone();
move || async move {
axum::Json(serde_json::json!({
"status": "ok",
"version": env!("CARGO_PKG_VERSION"),
"services": services,
}))
}
}),
)
.route(
"/moto-api/reset",
axum::routing::post({
let s = reset_state.clone();
move || async move { s.reset() }
}),
)
.route(
"/_reset",
axum::routing::post({
let s = reset_state;
move || async move { s.reset() }
}),
)
.fallback(dispatch::dispatch)
.layer(Extension(Arc::new(registry)))
.layer(Extension(Arc::new(config)))
.layer(TraceLayer::new_for_http());
let listener = TcpListener::bind(&cli.addr).await.unwrap();
tracing::info!(addr = %cli.addr, "fakecloud is ready");
axum::serve(listener, app)
.with_graceful_shutdown(shutdown_signal())
.await
.unwrap();
}
#[derive(Clone)]
struct ResetState {
iam: fakecloud_iam::state::SharedIamState,
sqs: fakecloud_sqs::state::SharedSqsState,
sns: fakecloud_sns::state::SharedSnsState,
eb: fakecloud_eventbridge::state::SharedEventBridgeState,
ssm: fakecloud_ssm::state::SharedSsmState,
s3: fakecloud_s3::state::SharedS3State,
}
impl ResetState {
fn reset(&self) -> axum::Json<serde_json::Value> {
self.iam.write().reset();
self.sqs.write().queues.clear();
self.sqs.write().name_to_url.clear();
{
let mut sns = self.sns.write();
sns.reset();
sns.seed_default_opted_out();
}
{
let mut eb = self.eb.write();
eb.rules.clear();
eb.events.clear();
eb.archives.clear();
eb.connections.clear();
eb.api_destinations.clear();
eb.replays.clear();
eb.buses.retain(|name, _| name == "default");
eb.lambda_invocations.clear();
eb.log_deliveries.clear();
eb.step_function_executions.clear();
}
self.ssm.write().reset();
self.s3.write().reset();
tracing::info!("state reset via reset API");
axum::Json(serde_json::json!({"status": "ok"}))
}
}
async fn shutdown_signal() {
tokio::signal::ctrl_c()
.await
.expect("failed to install Ctrl+C handler");
tracing::info!("shutting down");
}