use std::net::SocketAddr;
use std::path::PathBuf;
use std::str::FromStr;
use std::sync::Arc;
use greentic_deploy_spec::{Environment, RevisionLifecycle};
use greentic_deployer::cli::{
OpError, OpFlags, OpOutcome,
revisions::{self, RevisionStagePayload, RevisionTransitionPayload},
traffic::{self, TrafficSetPayload, TrafficShowPayload},
};
use greentic_deployer::environment::{EnvironmentStore, LocalFsStore};
use http_body_util::{BodyExt, Limited};
use hyper::{
HeaderMap, Method, Request, StatusCode,
body::Incoming,
header::{HeaderValue, ORIGIN},
};
use serde::de::DeserializeOwned;
use serde_json::json;
use crate::http_util::{
HttpError as DeploymentError, HttpResponse as DeploymentResponse,
HttpResult as DeploymentResult, error_response, into_error, json_response,
};
const ENV_ROOT_VAR: &str = "GREENTIC_OPERATOR_ENV_ROOT";
const MAX_BODY_BYTES: usize = 512 * 1024;
pub struct DeploymentsState {
pub env_root: Option<PathBuf>,
pub loopback_only: bool,
}
impl DeploymentsState {
pub fn new() -> Self {
Self {
env_root: None,
loopback_only: true,
}
}
fn build_store(&self) -> Result<LocalFsStore, DeploymentError> {
if let Some(root) = &self.env_root {
return Ok(LocalFsStore::new(root.clone()));
}
if let Ok(override_root) = std::env::var(ENV_ROOT_VAR)
&& !override_root.is_empty()
{
return Ok(LocalFsStore::new(PathBuf::from(override_root)));
}
LocalFsStore::default_root()
.map(LocalFsStore::new)
.ok_or_else(|| {
into_error(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"no env-store root: HOME / USERPROFILE not set and \
GREENTIC_OPERATOR_ENV_ROOT unset",
))
})
}
}
impl Default for DeploymentsState {
fn default() -> Self {
Self::new()
}
}
pub async fn handle_deployments_request(
req: Request<Incoming>,
path: &str,
peer_addr: SocketAddr,
state: &Arc<DeploymentsState>,
) -> DeploymentResult {
check_trust_boundary(peer_addr, req.headers(), state)?;
let method = req.method().clone();
let body_bytes = Limited::new(req.into_body(), MAX_BODY_BYTES)
.collect()
.await
.map(|c| c.to_bytes())
.map_err(|err| {
let too_large = err
.downcast_ref::<http_body_util::LengthLimitError>()
.is_some();
let (status, msg) = if too_large {
(
StatusCode::PAYLOAD_TOO_LARGE,
format!("body exceeds {MAX_BODY_BYTES} bytes"),
)
} else {
(StatusCode::BAD_REQUEST, format!("read body: {err}"))
};
into_error(error_response(status, msg))
})?;
dispatch(method, path, &body_bytes, state).await
}
fn check_trust_boundary(
peer_addr: SocketAddr,
headers: &HeaderMap<HeaderValue>,
state: &Arc<DeploymentsState>,
) -> Result<(), DeploymentError> {
if state.loopback_only && !peer_addr.ip().is_loopback() {
return Err(into_error(error_response(
StatusCode::FORBIDDEN,
format!(
"/deployments/* refused: peer {peer_addr} is not loopback; \
mTLS / admin admission is Phase D (plan §B4b)"
),
)));
}
if let Some(origin) = headers.get(ORIGIN).and_then(|v| v.to_str().ok())
&& !is_loopback_origin(origin)
{
return Err(into_error(error_response(
StatusCode::FORBIDDEN,
format!(
"/deployments/* refused: Origin `{origin}` is not loopback; \
cross-origin browser callers are blocked until mTLS lands"
),
)));
}
Ok(())
}
fn is_loopback_origin(origin: &str) -> bool {
if origin.eq_ignore_ascii_case("null") {
return true;
}
let after_scheme = origin
.strip_prefix("http://")
.or_else(|| origin.strip_prefix("https://"))
.unwrap_or(origin);
let host = after_scheme.split('/').next().unwrap_or("");
let host_no_port = if let Some(stripped) = host.strip_prefix('[') {
stripped.split(']').next().unwrap_or("")
} else {
host.split(':').next().unwrap_or("")
};
matches!(
host_no_port,
"localhost" | "127.0.0.1" | "::1" | "0:0:0:0:0:0:0:1"
)
}
async fn dispatch(
method: Method,
path: &str,
body_bytes: &[u8],
state: &Arc<DeploymentsState>,
) -> DeploymentResult {
let sub_path = path
.strip_prefix("/deployments")
.unwrap_or("")
.trim_end_matches('/');
match (method, sub_path) {
(Method::POST, "/stage") => {
let payload = parse_json::<RevisionStagePayload>(body_bytes)?;
run_blocking(state, move |store, flags| {
revisions::stage(store, flags, Some(payload))
})
.await
}
(Method::POST, "/warm") => {
let payload = parse_json::<RevisionTransitionPayload>(body_bytes)?;
run_blocking(state, move |store, flags| {
revisions::warm(store, flags, Some(payload))
})
.await
}
(Method::POST, "/activate") => {
let payload = parse_json::<TrafficSetPayload>(body_bytes)?;
run_blocking(state, move |store, flags| {
traffic::set(store, flags, Some(payload))
})
.await
}
(Method::POST, "/rollback") => {
let payload = parse_json::<TrafficShowPayload>(body_bytes)?;
run_blocking(state, move |store, flags| {
traffic::rollback(store, flags, Some(payload))
})
.await
}
(Method::POST, "/complete-drain") => {
let payload = parse_json::<RevisionTransitionPayload>(body_bytes)?;
complete_drain(state, payload).await
}
(_, "/stage" | "/warm" | "/activate" | "/rollback" | "/complete-drain") => {
Err(into_error(error_response(
StatusCode::METHOD_NOT_ALLOWED,
format!("only POST allowed on /deployments{sub_path}"),
)))
}
_ => Err(into_error(error_response(
StatusCode::NOT_FOUND,
format!("unknown /deployments endpoint: {sub_path}"),
))),
}
}
async fn complete_drain(
state: &Arc<DeploymentsState>,
payload: RevisionTransitionPayload,
) -> DeploymentResult {
let store = state.build_store()?;
let env_id = match greentic_deploy_spec::EnvId::try_from(payload.environment_id.as_str()) {
Ok(id) => id,
Err(e) => {
return Err(into_error(op_error_response(&OpError::InvalidArgument(
format!("environment_id: {e}"),
))));
}
};
let revision_id_str = payload.revision_id.clone();
let env_for_check = store.clone();
let preflight = tokio::task::spawn_blocking(move || env_for_check.load(&env_id))
.await
.map_err(|err| {
into_error(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("worker join: {err}"),
))
})?;
let env = preflight.map_err(|e| into_error(op_error_response(&OpError::from(e))))?;
precheck_drain_completable(&env, &revision_id_str)
.map_err(|err| into_error(op_error_response(&err)))?;
run_blocking(state, move |store, flags| {
revisions::archive(store, flags, Some(payload))
})
.await
}
fn precheck_drain_completable(env: &Environment, revision_id: &str) -> Result<(), OpError> {
let target = ulid::Ulid::from_str(revision_id)
.map_err(|e| OpError::InvalidArgument(format!("revision_id: {e}")))?;
let rev = env
.revisions
.iter()
.find(|r| r.revision_id.0 == target)
.ok_or_else(|| OpError::NotFound(format!("revision `{revision_id}` not found in env")))?;
if rev.lifecycle != RevisionLifecycle::Draining {
return Err(OpError::Conflict(format!(
"/complete-drain requires revision in `Draining`; \
revision `{revision_id}` is `{:?}`. Initiate drain \
(`Ready → Draining`) before completing it.",
rev.lifecycle,
)));
}
Ok(())
}
async fn run_blocking<F>(state: &Arc<DeploymentsState>, op: F) -> DeploymentResult
where
F: FnOnce(&LocalFsStore, &OpFlags) -> Result<OpOutcome, OpError> + Send + 'static,
{
let store = state.build_store()?;
let result = tokio::task::spawn_blocking(move || op(&store, &OpFlags::default()))
.await
.map_err(|err| {
into_error(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("worker join: {err}"),
))
})?;
match result {
Ok(outcome) => Ok(json_response(
StatusCode::OK,
serde_json::to_value(outcome).unwrap_or_else(|_| json!({})),
)),
Err(err) => Err(into_error(op_error_response(&err))),
}
}
fn parse_json<T: DeserializeOwned>(body: &[u8]) -> Result<T, DeploymentError> {
serde_json::from_slice::<T>(body).map_err(|err| {
into_error(error_response(
StatusCode::BAD_REQUEST,
format!("invalid JSON: {err}"),
))
})
}
fn op_error_response(err: &OpError) -> DeploymentResponse {
let status = match err {
OpError::NotFound(_) => StatusCode::NOT_FOUND,
OpError::Conflict(_) => StatusCode::CONFLICT,
OpError::Unauthorized { .. } => StatusCode::FORBIDDEN,
OpError::InvalidArgument(_) | OpError::Spec(_) | OpError::AnswersParse { .. } => {
StatusCode::BAD_REQUEST
}
OpError::NotYetImplemented(_) => StatusCode::NOT_IMPLEMENTED,
OpError::Store(_)
| OpError::Io { .. }
| OpError::SchemaGeneration(_)
| OpError::Audit(_) => StatusCode::INTERNAL_SERVER_ERROR,
};
json_response(
status,
json!({
"error": {
"kind": err.kind(),
"message": err.to_string(),
}
}),
)
}
#[cfg(test)]
mod tests {
use super::*;
use chrono::Utc;
use greentic_deploy_spec::{
BundleId, DeploymentId, EnvId, Revision, RevisionId, RevisionLifecycle, SchemaVersion,
};
use http_body_util::BodyExt;
use std::net::{IpAddr, Ipv4Addr};
fn state_with_tempdir(tmp: &tempfile::TempDir) -> Arc<DeploymentsState> {
Arc::new(DeploymentsState {
env_root: Some(tmp.path().to_path_buf()),
loopback_only: true,
})
}
fn loopback_peer() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::LOCALHOST), 1234)
}
fn remote_peer() -> SocketAddr {
SocketAddr::new(IpAddr::V4(Ipv4Addr::new(203, 0, 113, 7)), 1234)
}
async fn read_body_json(resp: DeploymentResponse) -> serde_json::Value {
let bytes = resp.into_body().collect().await.unwrap().to_bytes();
if bytes.is_empty() {
return json!({});
}
serde_json::from_slice(&bytes).expect("valid JSON body")
}
#[test]
fn max_body_bytes_constant_is_reasonable() {
assert_eq!(MAX_BODY_BYTES, 512 * 1024);
}
#[tokio::test]
async fn unknown_subpath_returns_404() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let err = dispatch(Method::POST, "/deployments/bogus", b"{}", &state)
.await
.expect_err("bogus path must error");
assert_eq!(err.status(), StatusCode::NOT_FOUND);
}
#[tokio::test]
async fn wrong_method_returns_405() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
for verb in [
"/stage",
"/warm",
"/activate",
"/rollback",
"/complete-drain",
] {
let path = format!("/deployments{verb}");
let err = dispatch(Method::GET, &path, b"", &state)
.await
.expect_err("GET must error");
assert_eq!(
err.status(),
StatusCode::METHOD_NOT_ALLOWED,
"GET {path} should 405"
);
}
}
#[tokio::test]
async fn malformed_json_returns_400() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let err = dispatch(Method::POST, "/deployments/stage", b"{not-json", &state)
.await
.expect_err("malformed JSON must error");
assert_eq!(err.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn missing_required_payload_field_returns_400() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let body = br#"{"environment_id": "demo-env"}"#;
let err = dispatch(Method::POST, "/deployments/warm", body, &state)
.await
.expect_err("missing field must error");
assert_eq!(err.status(), StatusCode::BAD_REQUEST);
}
#[tokio::test]
async fn end_to_end_through_deployer_lib_returns_typed_error_envelope() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let body = br#"{
"environment_id": "demo-env",
"deployment_id": "01HQXXVNAEPS9YYBFB3FBTQDR6",
"bundle_digest": "sha256:00"
}"#;
let err = dispatch(Method::POST, "/deployments/stage", body, &state)
.await
.expect_err("local-only auth denial must surface");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
let body = read_body_json(*err).await;
assert_eq!(body["error"]["kind"], "unauthorized");
}
#[test]
fn op_error_to_status_covers_every_variant() {
let cases: Vec<(OpError, StatusCode)> = vec![
(
OpError::Store(greentic_deployer::environment::StoreError::NotFound(
demo_env_id(),
)),
StatusCode::INTERNAL_SERVER_ERROR,
),
(
OpError::Spec(greentic_deploy_spec::SpecError::BasisPointsSum { sum: 0 }),
StatusCode::BAD_REQUEST,
),
(
OpError::Io {
path: PathBuf::from("/dev/null"),
source: std::io::Error::other("boom"),
},
StatusCode::INTERNAL_SERVER_ERROR,
),
(
OpError::AnswersParse {
path: PathBuf::from("/dev/null"),
message: "x".into(),
},
StatusCode::BAD_REQUEST,
),
(
OpError::SchemaGeneration("x".into()),
StatusCode::INTERNAL_SERVER_ERROR,
),
(
OpError::InvalidArgument("x".into()),
StatusCode::BAD_REQUEST,
),
(OpError::NotFound("x".into()), StatusCode::NOT_FOUND),
(OpError::NotYetImplemented("x"), StatusCode::NOT_IMPLEMENTED),
(OpError::Conflict("x".into()), StatusCode::CONFLICT),
(
OpError::Unauthorized {
policy: "p".into(),
reason: "r".into(),
},
StatusCode::FORBIDDEN,
),
(
OpError::Audit("x".into()),
StatusCode::INTERNAL_SERVER_ERROR,
),
];
for (err, expected) in &cases {
assert_eq!(
op_error_response(err).status(),
*expected,
"{err} should map to {expected}"
);
}
}
#[test]
fn check_trust_boundary_blocks_non_loopback_peer() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let headers = HeaderMap::new();
let err = check_trust_boundary(remote_peer(), &headers, &state)
.expect_err("remote peer must be refused");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
#[test]
fn check_trust_boundary_allows_loopback_peer_without_origin() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let headers = HeaderMap::new();
check_trust_boundary(loopback_peer(), &headers, &state).expect("loopback must pass");
}
#[test]
fn check_trust_boundary_allows_loopback_origin() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let mut headers = HeaderMap::new();
headers.insert(ORIGIN, HeaderValue::from_static("http://127.0.0.1:8080"));
check_trust_boundary(loopback_peer(), &headers, &state).expect("loopback Origin must pass");
headers.insert(ORIGIN, HeaderValue::from_static("http://localhost:8080"));
check_trust_boundary(loopback_peer(), &headers, &state)
.expect("localhost Origin must pass");
headers.insert(ORIGIN, HeaderValue::from_static("http://[::1]:8080"));
check_trust_boundary(loopback_peer(), &headers, &state).expect("[::1] Origin must pass");
headers.insert(ORIGIN, HeaderValue::from_static("null"));
check_trust_boundary(loopback_peer(), &headers, &state).expect("`null` Origin must pass");
}
#[test]
fn check_trust_boundary_blocks_cross_origin_csrf() {
let tmp = tempfile::tempdir().unwrap();
let state = state_with_tempdir(&tmp);
let mut headers = HeaderMap::new();
headers.insert(ORIGIN, HeaderValue::from_static("https://evil.example.com"));
let err = check_trust_boundary(loopback_peer(), &headers, &state)
.expect_err("remote Origin must be refused");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
#[test]
fn loopback_only_false_disables_peer_check_but_keeps_origin_check() {
let tmp = tempfile::tempdir().unwrap();
let state = Arc::new(DeploymentsState {
env_root: Some(tmp.path().to_path_buf()),
loopback_only: false,
});
let headers = HeaderMap::new();
check_trust_boundary(remote_peer(), &headers, &state)
.expect("remote peer must pass when loopback_only=false");
let mut hostile = HeaderMap::new();
hostile.insert(ORIGIN, HeaderValue::from_static("https://evil.example.com"));
let err = check_trust_boundary(remote_peer(), &hostile, &state)
.expect_err("remote Origin must still be refused");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
#[test]
fn is_loopback_origin_classifier() {
for ok in [
"http://localhost",
"http://localhost:3000",
"https://localhost",
"http://127.0.0.1",
"http://127.0.0.1:8080",
"http://[::1]",
"http://[::1]:8080",
"null",
"NULL",
] {
assert!(is_loopback_origin(ok), "{ok} should classify as loopback");
}
for not_ok in [
"http://example.com",
"https://attacker.example",
"http://127.0.0.1.evil.example",
"http://localhostess.example",
"http://10.0.0.1",
"http://[2001:db8::1]",
] {
assert!(
!is_loopback_origin(not_ok),
"{not_ok} must NOT classify as loopback"
);
}
}
fn demo_env_id() -> EnvId {
EnvId::try_from("demo-env").unwrap()
}
fn revision_with_lifecycle(id: &RevisionId, lifecycle: RevisionLifecycle) -> Revision {
Revision {
schema: SchemaVersion::new(SchemaVersion::REVISION_V1),
revision_id: *id,
env_id: demo_env_id(),
bundle_id: BundleId::new("demo-bundle"),
deployment_id: DeploymentId(ulid::Ulid::new()),
sequence: 1,
created_at: Utc::now(),
bundle_digest: "sha256:00".into(),
pack_list: Vec::new(),
pack_list_lock_ref: PathBuf::from("pack-list.lock"),
config_digest: "sha256:00".into(),
signature_sidecar_ref: PathBuf::from("rev.sig"),
lifecycle,
staged_at: None,
warmed_at: None,
drain_seconds: 30,
abort_metrics: Vec::new(),
}
}
fn env_with_one_revision(rev: Revision) -> Environment {
Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: demo_env_id(),
name: "demo".into(),
host_config: greentic_deploy_spec::EnvironmentHostConfig {
env_id: demo_env_id(),
region: None,
tenant_org_id: None,
},
packs: Vec::new(),
credentials_ref: None,
bundles: Vec::new(),
revisions: vec![rev],
traffic_splits: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
}
}
#[test]
fn precheck_drain_completable_accepts_draining_revision() {
let rev_id = RevisionId(ulid::Ulid::new());
let env = env_with_one_revision(revision_with_lifecycle(
&rev_id,
RevisionLifecycle::Draining,
));
precheck_drain_completable(&env, &rev_id.to_string())
.expect("Draining revision must be accepted");
}
#[test]
fn precheck_drain_completable_rejects_ready_revision_with_conflict() {
let rev_id = RevisionId(ulid::Ulid::new());
let env = env_with_one_revision(revision_with_lifecycle(&rev_id, RevisionLifecycle::Ready));
let err = precheck_drain_completable(&env, &rev_id.to_string())
.expect_err("Ready revision must be refused");
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
assert_eq!(op_error_response(&err).status(), StatusCode::CONFLICT);
}
#[test]
fn precheck_drain_completable_rejects_other_lifecycle_states() {
for lifecycle in [
RevisionLifecycle::Inactive,
RevisionLifecycle::Staged,
RevisionLifecycle::Warming,
RevisionLifecycle::Failed,
RevisionLifecycle::Archived,
] {
let rev_id = RevisionId(ulid::Ulid::new());
let env = env_with_one_revision(revision_with_lifecycle(&rev_id, lifecycle));
let err = precheck_drain_completable(&env, &rev_id.to_string())
.expect_err("non-Draining must be refused");
assert!(matches!(err, OpError::Conflict(_)), "got {err:?}");
}
}
#[test]
fn precheck_drain_completable_accepts_lowercase_ulid_input() {
let rev_id = RevisionId(ulid::Ulid::new());
let env = env_with_one_revision(revision_with_lifecycle(
&rev_id,
RevisionLifecycle::Draining,
));
let lower = rev_id.to_string().to_ascii_lowercase();
precheck_drain_completable(&env, &lower)
.expect("lowercase ULID must match the same revision");
}
#[test]
fn precheck_drain_completable_invalid_ulid_is_400() {
let env = env_with_one_revision(revision_with_lifecycle(
&RevisionId(ulid::Ulid::new()),
RevisionLifecycle::Draining,
));
let err = precheck_drain_completable(&env, "not-a-ulid")
.expect_err("non-ULID input must be InvalidArgument");
assert!(matches!(err, OpError::InvalidArgument(_)), "got {err:?}");
assert_eq!(op_error_response(&err).status(), StatusCode::BAD_REQUEST);
}
#[test]
fn precheck_drain_completable_missing_revision_is_404() {
let env = env_with_one_revision(revision_with_lifecycle(
&RevisionId(ulid::Ulid::new()),
RevisionLifecycle::Draining,
));
let unknown = RevisionId(ulid::Ulid::new()).to_string();
let err = precheck_drain_completable(&env, &unknown)
.expect_err("unknown revision must be NotFound");
assert!(matches!(err, OpError::NotFound(_)), "got {err:?}");
assert_eq!(op_error_response(&err).status(), StatusCode::NOT_FOUND);
}
}