pub mod firewall;
mod k8s;
pub mod netpolicy;
use std::collections::{BTreeMap, HashMap};
use std::sync::Arc;
use parking_lot::RwLock;
use firewall::{
render_ruleset, resolve_enforcement_mode, EnforcementMode, InstanceRules, SubnetFirewall,
};
const DEFAULT_IMAGE_ENV: &str = "FAKECLOUD_EC2_DEFAULT_IMAGE";
const DEFAULT_IMAGE: &str = "amazonlinux:2023";
#[derive(Debug, thiserror::Error)]
pub enum RuntimeError {
#[error("container failed to start: {0}")]
ContainerStartFailed(String),
}
#[derive(Debug, thiserror::Error)]
pub enum BackendInitError {
#[error(transparent)]
Env(#[from] fakecloud_k8s::K8sEnvError),
#[error(transparent)]
PodConfig(#[from] fakecloud_k8s::K8sPodConfigError),
#[error("failed to connect to the Kubernetes cluster: {0}")]
Connect(String),
}
#[derive(Debug, Clone)]
pub struct RunningInstance {
pub container_id: String,
pub private_ip: String,
pub network: Option<String>,
}
#[derive(Debug, Clone)]
pub struct InstanceNetwork {
pub subnet_id: String,
pub internal: bool,
}
pub fn subnet_network_name(subnet_id: &str) -> String {
format!("fakecloud-subnet-{subnet_id}")
}
#[derive(Debug, Clone)]
pub struct NetworkIsolationSummary {
pub backend: &'static str,
pub sg_enforcement: &'static str,
pub enforced: bool,
}
#[derive(Debug, Clone)]
struct InstanceRecord {
handle: String,
image: String,
user_data: Option<String>,
tags: BTreeMap<String, String>,
network: Option<InstanceNetwork>,
}
#[derive(Debug, Clone)]
enum InstanceBackend {
Docker(DockerInstances),
K8s(k8s::K8sInstances),
}
#[derive(Debug, Clone)]
pub struct FirewallEnforcer {
mode: EnforcementMode,
}
impl FirewallEnforcer {
fn detect() -> Self {
let requested = std::env::var("FAKECLOUD_EC2_SG_ENFORCEMENT").ok();
let mode = resolve_enforcement_mode(
requested.as_deref(),
firewall::host_shares_daemon_netns(),
firewall::nft_available,
);
if requested.is_some() && mode == EnforcementMode::Disabled {
tracing::warn!(
"EC2 security-group enforcement was requested but it can't take effect here \
(needs nftables + CAP_NET_ADMIN on a native-Linux host whose daemon shares this \
network namespace — Docker Desktop / podman-machine run the daemon in a VM); \
falling back to metadata-only (phase-2 L3 isolation stays active, security-group \
rules are tracked but not enforced)"
);
} else if mode == EnforcementMode::Nftables {
tracing::info!("EC2 security-group enforcement active via nftables");
}
Self { mode }
}
fn disabled() -> Self {
Self {
mode: EnforcementMode::Disabled,
}
}
pub fn mode(&self) -> EnforcementMode {
self.mode
}
pub fn enabled(&self) -> bool {
self.mode != EnforcementMode::Disabled
}
async fn reconcile(&self, subnets: &[SubnetFirewall]) {
if self.mode == EnforcementMode::Disabled {
return;
}
let _ = tokio::process::Command::new("modprobe")
.arg("br_netfilter")
.output()
.await;
let _ = tokio::process::Command::new("sysctl")
.args(["-w", "net.bridge.bridge-nf-call-iptables=1"])
.output()
.await;
let ruleset = render_ruleset(subnets);
use tokio::io::AsyncWriteExt;
let mut child = match tokio::process::Command::new("nft")
.args(["-f", "-"])
.stdin(std::process::Stdio::piped())
.stdout(std::process::Stdio::null())
.stderr(std::process::Stdio::piped())
.spawn()
{
Ok(c) => c,
Err(e) => {
tracing::warn!(error = %e, "failed to spawn nft; security-group ruleset not applied");
return;
}
};
if let Some(mut stdin) = child.stdin.take() {
let _ = stdin.write_all(ruleset.as_bytes()).await;
let _ = stdin.shutdown().await;
}
match child.wait_with_output().await {
Ok(out) if out.status.success() => {
tracing::debug!(
subnets = subnets.len(),
"applied EC2 security-group nft ruleset"
);
}
Ok(out) => {
tracing::warn!(
stderr = %String::from_utf8_lossy(&out.stderr).trim(),
"nft rejected the security-group ruleset; leaving the previous ruleset in place"
);
}
Err(e) => tracing::warn!(error = %e, "nft apply failed"),
}
}
}
#[derive(Debug, Clone)]
pub struct Ec2Runtime {
backend: InstanceBackend,
instances: Arc<RwLock<HashMap<String, InstanceRecord>>>,
firewall: FirewallEnforcer,
reconcile_lock: Arc<tokio::sync::Mutex<()>>,
}
impl Ec2Runtime {
pub fn new() -> Option<Self> {
let cli = fakecloud_core::container_net::detect_container_cli()?;
Some(Self {
backend: InstanceBackend::Docker(DockerInstances {
cli,
instance_id: format!("fakecloud-{}", std::process::id()),
}),
instances: Arc::new(RwLock::new(HashMap::new())),
firewall: FirewallEnforcer::detect(),
reconcile_lock: Arc::new(tokio::sync::Mutex::new(())),
})
}
pub async fn new_k8s(server_port: u16) -> Result<Self, BackendInitError> {
let backend = k8s::K8sInstances::from_env(server_port).await?;
Ok(Self {
backend: InstanceBackend::K8s(backend),
instances: Arc::new(RwLock::new(HashMap::new())),
firewall: FirewallEnforcer::disabled(),
reconcile_lock: Arc::new(tokio::sync::Mutex::new(())),
})
}
pub fn firewall(&self) -> &FirewallEnforcer {
&self.firewall
}
pub async fn reconcile_firewall(&self, subnets: Vec<SubnetFirewall>) {
let _guard = self.reconcile_lock.lock().await;
self.firewall.reconcile(&subnets).await;
}
pub fn network_isolation_enforced(&self) -> bool {
self.firewall.enabled() || self.is_k8s()
}
pub fn is_k8s(&self) -> bool {
matches!(self.backend, InstanceBackend::K8s(_))
}
pub async fn reconcile_network_policies(&self, rules: Vec<InstanceRules>) {
if let InstanceBackend::K8s(k) = &self.backend {
let _guard = self.reconcile_lock.lock().await;
k.reconcile_network_policies(&rules).await;
}
}
pub fn network_isolation_summary(&self) -> NetworkIsolationSummary {
match &self.backend {
InstanceBackend::Docker(d) => NetworkIsolationSummary {
backend: if fakecloud_core::container_net::is_podman_binary(&d.cli) {
"podman"
} else {
"docker"
},
sg_enforcement: match self.firewall.mode() {
EnforcementMode::Nftables => "nftables",
EnforcementMode::Disabled => "disabled",
},
enforced: self.firewall.enabled(),
},
InstanceBackend::K8s(k) => NetworkIsolationSummary {
backend: "kubernetes",
sg_enforcement: "networkpolicy",
enforced: k.cni_enforces(),
},
}
}
pub fn cli_name(&self) -> &str {
match &self.backend {
InstanceBackend::Docker(d) => &d.cli,
InstanceBackend::K8s(_) => "kubernetes",
}
}
pub async fn run_instance(
&self,
instance_id: &str,
user_data: Option<&str>,
tags: &BTreeMap<String, String>,
network: Option<&InstanceNetwork>,
) -> Result<RunningInstance, RuntimeError> {
let image = default_image();
let running = match &self.backend {
InstanceBackend::Docker(d) => {
d.run_instance(instance_id, &image, user_data, network)
.await?
}
InstanceBackend::K8s(k) => k.spawn_pod(instance_id, &image, user_data, tags).await?,
};
self.instances.write().insert(
instance_id.to_string(),
InstanceRecord {
handle: running.container_id.clone(),
image,
user_data: user_data.map(str::to_string),
tags: tags.clone(),
network: network.cloned(),
},
);
Ok(running)
}
pub async fn stop_instance(&self, instance_id: &str) {
let Some(handle) = self.handle_of(instance_id) else {
return;
};
match &self.backend {
InstanceBackend::Docker(d) => d.stop(&handle).await,
InstanceBackend::K8s(k) => k.delete_pod(&handle).await,
}
}
pub async fn start_instance(&self, instance_id: &str) -> Option<RunningInstance> {
let record = self.instances.read().get(instance_id)?.clone();
match &self.backend {
InstanceBackend::Docker(d) => {
let private_ip = d.start(&record.handle).await?;
Some(RunningInstance {
container_id: record.handle,
private_ip,
network: record
.network
.as_ref()
.map(|n| subnet_network_name(&n.subnet_id)),
})
}
InstanceBackend::K8s(k) => {
let running = k
.spawn_pod(
instance_id,
&record.image,
record.user_data.as_deref(),
&record.tags,
)
.await
.ok()?;
self.update_handle(instance_id, &running.container_id);
Some(running)
}
}
}
pub async fn reboot_instance(&self, instance_id: &str) -> Option<RunningInstance> {
let record = self.instances.read().get(instance_id).cloned()?;
match &self.backend {
InstanceBackend::Docker(d) => {
d.reboot(&record.handle).await;
None
}
InstanceBackend::K8s(k) => {
k.delete_pod(&record.handle).await;
let running = k
.spawn_pod(
instance_id,
&record.image,
record.user_data.as_deref(),
&record.tags,
)
.await
.ok()?;
self.update_handle(instance_id, &running.container_id);
Some(running)
}
}
}
pub async fn terminate_instance(&self, instance_id: &str) {
let record = self.instances.write().remove(instance_id);
if let Some(record) = record {
match &self.backend {
InstanceBackend::Docker(d) => d.remove(&record.handle).await,
InstanceBackend::K8s(k) => k.delete_pod(&record.handle).await,
}
}
}
pub async fn stop_all(&self) {
let records: Vec<InstanceRecord> = {
let mut instances = self.instances.write();
instances.drain().map(|(_, r)| r).collect()
};
for record in records {
match &self.backend {
InstanceBackend::Docker(d) => d.remove(&record.handle).await,
InstanceBackend::K8s(k) => k.delete_pod(&record.handle).await,
}
}
}
pub async fn reap_stale(&self) {
if let InstanceBackend::K8s(k) = &self.backend {
k.reap_stale().await;
}
}
pub async fn console_output(&self, instance_id: &str) -> Option<Vec<u8>> {
let handle = self.handle_of(instance_id)?;
match &self.backend {
InstanceBackend::Docker(d) => d.logs(&handle).await,
InstanceBackend::K8s(k) => k.logs(&handle).await,
}
}
fn handle_of(&self, instance_id: &str) -> Option<String> {
self.instances
.read()
.get(instance_id)
.map(|r| r.handle.clone())
}
fn update_handle(&self, instance_id: &str, handle: &str) {
if let Some(record) = self.instances.write().get_mut(instance_id) {
record.handle = handle.to_string();
}
}
}
fn default_image() -> String {
std::env::var(DEFAULT_IMAGE_ENV).unwrap_or_else(|_| DEFAULT_IMAGE.to_string())
}
fn boot_command(user_data: Option<&str>) -> Vec<String> {
match user_data.filter(|s| !s.is_empty()) {
Some(b64) => {
let script = format!("printf %s '{b64}' | base64 -d | sh & exec tail -f /dev/null");
vec!["sh".to_string(), "-c".to_string(), script]
}
None => vec![
"tail".to_string(),
"-f".to_string(),
"/dev/null".to_string(),
],
}
}
#[derive(Debug, Clone)]
struct DockerInstances {
cli: String,
instance_id: String,
}
impl DockerInstances {
async fn run_instance(
&self,
instance_id: &str,
image: &str,
user_data: Option<&str>,
network: Option<&InstanceNetwork>,
) -> Result<RunningInstance, RuntimeError> {
let attached_network = match network {
Some(net) => self.ensure_subnet_network(net).await,
None => None,
};
let mut args: Vec<String> = vec![
"run".to_string(),
"-d".to_string(),
"--label".to_string(),
format!("fakecloud-ec2={instance_id}"),
"--label".to_string(),
format!("fakecloud-instance={}", self.instance_id),
];
if let Some(name) = &attached_network {
args.push("--network".to_string());
args.push(name.clone());
}
args.push(image.to_string());
args.extend(boot_command(user_data));
let output = tokio::process::Command::new(&self.cli)
.args(&args)
.output()
.await
.map_err(|e| RuntimeError::ContainerStartFailed(e.to_string()))?;
if !output.status.success() {
return Err(RuntimeError::ContainerStartFailed(
String::from_utf8_lossy(&output.stderr).trim().to_string(),
));
}
let container_id = String::from_utf8_lossy(&output.stdout).trim().to_string();
let private_ip = self
.inspect_ip(&container_id)
.await
.unwrap_or_else(|| "10.0.0.1".to_string());
Ok(RunningInstance {
container_id,
private_ip,
network: attached_network,
})
}
async fn ensure_subnet_network(&self, net: &InstanceNetwork) -> Option<String> {
let name = subnet_network_name(&net.subnet_id);
let mut args = vec!["network".to_string(), "create".to_string()];
if net.internal {
args.push("--internal".to_string());
}
args.push("--label".to_string());
args.push(format!("fakecloud-subnet={}", net.subnet_id));
args.push("--label".to_string());
args.push(format!("fakecloud-instance={}", self.instance_id));
args.push(name.clone());
let output = tokio::process::Command::new(&self.cli)
.args(&args)
.output()
.await;
match output {
Ok(out) if out.status.success() => Some(name),
Ok(out) => {
let err = String::from_utf8_lossy(&out.stderr);
if err.contains("already exists") || err.contains("exists") {
Some(name)
} else {
tracing::warn!(
subnet = %net.subnet_id,
network = %name,
error = %err.trim(),
"subnet network creation failed; falling back to default bridge"
);
None
}
}
Err(e) => {
tracing::warn!(
subnet = %net.subnet_id,
network = %name,
error = %e,
"subnet network creation failed; falling back to default bridge"
);
None
}
}
}
async fn inspect_ip(&self, container_id: &str) -> Option<String> {
let output = tokio::process::Command::new(&self.cli)
.args([
"inspect",
"-f",
"{{range .NetworkSettings.Networks}}{{.IPAddress}}{{end}}",
container_id,
])
.output()
.await
.ok()?;
if !output.status.success() {
return None;
}
let ip = String::from_utf8_lossy(&output.stdout).trim().to_string();
if ip.is_empty() {
None
} else {
Some(ip)
}
}
async fn stop(&self, container_id: &str) {
let _ = tokio::process::Command::new(&self.cli)
.args(["stop", container_id])
.output()
.await;
}
async fn start(&self, container_id: &str) -> Option<String> {
let started = tokio::process::Command::new(&self.cli)
.args(["start", container_id])
.output()
.await
.map(|o| o.status.success())
.unwrap_or(false);
if !started {
return None;
}
self.inspect_ip(container_id).await
}
async fn reboot(&self, container_id: &str) {
let _ = tokio::process::Command::new(&self.cli)
.args(["restart", container_id])
.output()
.await;
}
async fn remove(&self, container_id: &str) {
let _ = tokio::process::Command::new(&self.cli)
.args(["rm", "-f", container_id])
.output()
.await;
}
async fn logs(&self, container_id: &str) -> Option<Vec<u8>> {
let output = tokio::process::Command::new(&self.cli)
.args(["logs", container_id])
.output()
.await
.ok()?;
if !output.status.success() {
return None;
}
let mut buf = output.stdout;
buf.extend_from_slice(&output.stderr);
Some(buf)
}
}