use std::{
collections::BTreeMap,
iter::FromIterator,
net::{IpAddr, Ipv4Addr, SocketAddr},
process::exit,
};
use anyhow::Context;
use http_body_util::Empty;
use hyper::{body::Bytes, server::conn::http1, service::service_fn, Response};
use hyper_util::rt::TokioIo;
use shuttle_api_client::ShuttleApiClient;
use shuttle_common::{
models::resource::{ResourceInput, ResourceState, ResourceType},
secrets::Secret,
};
use shuttle_service::{Environment, ResourceFactory, Service};
use tokio::net::TcpListener;
use tracing::{debug, error, info, trace};
use crate::__internals::{Loader, Runner};
struct RuntimeEnvVars {
shuttle: bool,
project_id: String,
project_name: String,
env: Environment,
ip: IpAddr,
port: u16,
healthz_port: Option<u16>,
api_url: String,
api_key: Option<String>,
}
impl RuntimeEnvVars {
fn parse() -> Self {
Self {
shuttle: std::env::var("SHUTTLE").is_ok(),
project_id: std::env::var("SHUTTLE_PROJECT_ID").expect("project id env var"),
project_name: std::env::var("SHUTTLE_PROJECT_NAME").expect("project name env var"),
env: std::env::var("SHUTTLE_ENV")
.expect("shuttle environment env var")
.parse()
.expect("invalid shuttle environment"),
ip: std::env::var("SHUTTLE_RUNTIME_IP")
.expect("runtime ip env var")
.parse()
.expect("invalid ip"),
port: std::env::var("SHUTTLE_RUNTIME_PORT")
.expect("runtime port env var")
.parse()
.expect("invalid port"),
healthz_port: std::env::var("SHUTTLE_HEALTHZ_PORT")
.map(|s| s.parse().expect("invalid healthz port"))
.ok(),
api_url: std::env::var("SHUTTLE_API").expect("api url env var"),
api_key: std::env::var("SHUTTLE_API_KEY").ok(),
}
}
}
pub async fn start(
loader: impl Loader + Send + 'static,
runner: impl Runner + Send + 'static,
) -> i32 {
debug!("Parsing environment variables");
let RuntimeEnvVars {
shuttle,
project_id,
project_name,
env,
ip,
port,
healthz_port,
api_url,
api_key,
} = RuntimeEnvVars::parse();
let service_addr = SocketAddr::new(ip, port);
let client = ShuttleApiClient::new(api_url, api_key, None, None);
if let Some(healthz_port) = healthz_port {
trace!("Starting health check server on port {healthz_port}");
let addr = SocketAddr::new(Ipv4Addr::UNSPECIFIED.into(), healthz_port);
tokio::spawn(async move {
let Ok(listener) = TcpListener::bind(&addr).await else {
eprintln!("ERROR: Failed to bind to health check port");
exit(201);
};
loop {
let Ok((stream, _)) = listener.accept().await else {
eprintln!("ERROR: Health check listener error");
exit(202);
};
let io = TokioIo::new(stream);
tokio::spawn(async move {
if let Err(err) = http1::Builder::new()
.serve_connection(
io,
service_fn(|_req| async move {
trace!("Received health check");
trace!("Responding to health check");
Result::<Response<Empty<Bytes>>, hyper::Error>::Ok(Response::new(
Empty::new(),
))
}),
)
.await
{
error!("Health check error: {err}");
}
});
}
});
}
info!("Loading resources");
trace!("Getting secrets");
let secrets: BTreeMap<String, String> =
match client.get_secrets(&project_id).await.and_then(|r| {
serde_json::from_value(r.into_inner().output).context("failed to deserialize secrets")
}) {
Ok(s) => s,
Err(e) => {
eprintln!("ERROR: Runtime Secret Loading phase failed: {e}");
return 101;
}
};
let secrets = BTreeMap::from_iter(secrets.into_iter().map(|(k, v)| (k, Secret::new(v))));
let factory = ResourceFactory::new(project_name, secrets.clone(), env);
let mut resources = match loader.load(factory).await {
Ok(r) => r,
Err(e) => {
eprintln!("ERROR: Runtime Loader phase failed: {e}");
return 111;
}
};
let values = match resources
.iter()
.map(|bytes| {
serde_json::from_slice::<ResourceInput>(bytes).context("deserializing resource input")
})
.collect::<anyhow::Result<Vec<_>>>()
{
Ok(v) => v,
Err(e) => {
eprintln!("ERROR: Runtime Provisioning phase failed: {e}");
return 121;
}
};
for (bytes, shuttle_resource) in resources
.iter_mut()
.zip(values)
.filter_map(|(bytes, value)| match value {
ResourceInput::Shuttle(shuttle_resource) => Some((bytes, shuttle_resource)),
ResourceInput::Custom(_) => None,
})
{
if shuttle_resource.r#type == ResourceType::Secrets {
*bytes = serde_json::to_vec(&secrets).expect("to serialize struct");
continue;
}
info!("Provisioning {:?}", shuttle_resource.r#type);
loop {
trace!("Checking state of {:?}", shuttle_resource.r#type);
match client
.provision_resource(&project_id, shuttle_resource.clone())
.await
.map(|r| r.into_inner())
{
Ok(res) => {
trace!("Got response {:?}", res);
match res.state {
ResourceState::Provisioning | ResourceState::Authorizing => {
tokio::time::sleep(tokio::time::Duration::from_millis(2000)).await;
}
ResourceState::Ready => {
*bytes = serde_json::to_vec(&res.output).expect("to serialize struct");
break;
}
bad_state => {
eprintln!(
"ERROR: Runtime Provisioning phase failed: Received {:?} resource with state '{}'.",
shuttle_resource.r#type,
bad_state
);
return 132;
}
}
}
Err(e) => {
eprintln!("ERROR: Runtime Provisioning phase failed: {e}");
return 131;
}
};
}
}
if shuttle {
trace!("Sending sidecar shutdown request");
let _ = client.client.get("/__shuttle/shutdown").send().await;
}
let service = match runner.run(resources).await {
Ok(s) => s,
Err(e) => {
eprintln!("ERROR: Runtime Resource Initialization phase failed: {e}");
return 151;
}
};
info!("Starting service");
let service_bind = service.bind(service_addr);
#[cfg(target_family = "unix")]
let interrupted = {
let mut sigterm_notif =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
.expect("Can not get the SIGTERM signal receptor");
let mut sigint_notif =
tokio::signal::unix::signal(tokio::signal::unix::SignalKind::interrupt())
.expect("Can not get the SIGINT signal receptor");
tokio::select! {
res = service_bind => {
if let Err(e) = res {
tracing::error!("Service encountered an error in `bind`: {e}");
return 1;
}
tracing::warn!("Service terminated on its own. Shutting down the runtime...");
false
}
_ = sigterm_notif.recv() => {
tracing::warn!("Received SIGTERM. Shutting down the runtime...");
true
},
_ = sigint_notif.recv() => {
tracing::warn!("Received SIGINT. Shutting down the runtime...");
true
}
}
};
#[cfg(target_family = "windows")]
let interrupted = {
let mut ctrl_break_notif = tokio::signal::windows::ctrl_break()
.expect("Can not get the CtrlBreak signal receptor");
let mut ctrl_c_notif =
tokio::signal::windows::ctrl_c().expect("Can not get the CtrlC signal receptor");
let mut ctrl_close_notif = tokio::signal::windows::ctrl_close()
.expect("Can not get the CtrlClose signal receptor");
let mut ctrl_logoff_notif = tokio::signal::windows::ctrl_logoff()
.expect("Can not get the CtrlLogoff signal receptor");
let mut ctrl_shutdown_notif = tokio::signal::windows::ctrl_shutdown()
.expect("Can not get the CtrlShutdown signal receptor");
tokio::select! {
res = service_bind => {
if let Err(e) = res {
tracing::error!("Service encountered an error in `bind`: {e}");
exit(1);
}
tracing::warn!("Service terminated on its own. Shutting down the runtime...");
false
}
_ = ctrl_break_notif.recv() => {
tracing::warn!("Received ctrl-break. Shutting down the runtime...");
true
},
_ = ctrl_c_notif.recv() => {
tracing::warn!("Received ctrl-c. Shutting down the runtime...");
true
},
_ = ctrl_close_notif.recv() => {
tracing::warn!("Received ctrl-close. Shutting down the runtime...");
true
},
_ = ctrl_logoff_notif.recv() => {
tracing::warn!("Received ctrl-logoff. Shutting down the runtime...");
true
},
_ = ctrl_shutdown_notif.recv() => {
tracing::warn!("Received ctrl-shutdown. Shutting down the runtime...");
true
}
}
};
if interrupted {
return 10;
}
0
}