use std::collections::HashMap;
use std::convert::Infallible;
use std::net::{IpAddr, SocketAddr};
use std::sync::Arc;
use std::sync::mpsc;
use std::thread::{self, JoinHandle};
use std::time::Duration;
use anyhow::{Context, Result};
use arc_swap::ArcSwap;
use base64::Engine as _;
use base64::engine::general_purpose::STANDARD as BASE64;
use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
use greentic_types::ChannelMessageEnvelope;
use greentic_types::messaging::extensions::ext_keys;
use http_body_util::{BodyExt, Full, Limited};
use hyper::body::{Bytes, Incoming};
use hyper::server::conn::http1::Builder as Http1Builder;
use hyper::service::service_fn;
use hyper::{HeaderMap, Request, Response, StatusCode, header};
use hyper_util::rt::tokio::TokioIo;
use serde_json::Value;
use tokio::net::TcpListener;
use tokio::runtime::{Handle, Runtime};
use tokio::sync::oneshot;
use greentic_runner_host::{Activity, RunnerHost, WelcomeFlowHint};
use greentic_deploy_spec::{DEFAULT_LISTEN_ADDR, EnvironmentHostConfig};
use crate::deployment_routes::RevisionIngressRouting;
use crate::endpoint_resolver;
use crate::http_routes::{HttpRouteTable, RevisionScope};
use crate::identify_payload;
use crate::ingress_dispatch::parse_dispatch_result;
use crate::ingress_types::IngressHttpResponse;
use crate::messaging_dto::HttpInV1;
use crate::operator_log;
use crate::provider_auth;
use crate::revision_dispatcher::{
DispatchRequest, RevisionDispatcher, RevisionKey, SetCookieDirective, cookie_name,
};
use crate::revision_drain::{
DrainRequest, NoopRevisionTeardown, RevisionDrainCoordinator, RevisionLivenessProbe,
RevisionTeardown,
};
const MAX_BODY_BYTES: usize = 1 << 20;
#[derive(Clone)]
pub(crate) struct Activation {
pub host: Arc<RunnerHost>,
pub routing: Arc<RevisionIngressRouting>,
}
pub(crate) struct RevisionServeConfig {
pub bind_addr: SocketAddr,
pub activation: Arc<Activation>,
}
struct ServeState {
slot: ArcSwap<Activation>,
bound_addr: SocketAddr,
}
impl ServeState {
fn current(&self) -> Arc<Activation> {
self.slot.load_full()
}
}
fn removed_revisions(prev: &RevisionDispatcher, next: &RevisionDispatcher) -> Vec<RevisionKey> {
prev.revision_keys()
.into_iter()
.filter(|(deployment_id, _bundle_id, revision_id)| {
!next.contains_revision(*deployment_id, *revision_id)
})
.collect()
}
struct SlotLivenessProbe {
state: Arc<ServeState>,
draining_dispatcher: Arc<RevisionDispatcher>,
}
impl RevisionLivenessProbe for SlotLivenessProbe {
fn is_live_elsewhere(&self, deployment_id: DeploymentId, revision_id: RevisionId) -> bool {
let live = self.state.current();
if Arc::ptr_eq(&live.routing.dispatcher, &self.draining_dispatcher) {
return false;
}
live.routing
.dispatcher
.contains_revision(deployment_id, revision_id)
}
}
fn spawn_revision_drains(
runtime_handle: &Handle,
state: Arc<ServeState>,
prev: Arc<Activation>,
removed: Vec<RevisionKey>,
drain_window: Duration,
) {
let drain_seconds: u32 = drain_window.as_secs().try_into().unwrap_or(u32::MAX);
let teardown: Arc<dyn RevisionTeardown> = Arc::new(NoopRevisionTeardown);
for (deployment_id, bundle_id, revision_id) in removed {
let Some(tenant) = prev
.routing
.deployment_routes
.tenant_for(deployment_id)
.map(str::to_string)
else {
operator_log::warn(
module_path!(),
format!(
"skipping drain for revision {revision_id} of deployment \
{deployment_id}: no tenant binding found in OLD activation \
route table (deployment likely removed before reload diff)"
),
);
continue;
};
let dispatcher = Arc::clone(&prev.routing.dispatcher);
let teardown = Arc::clone(&teardown);
let liveness: Arc<dyn RevisionLivenessProbe> = Arc::new(SlotLivenessProbe {
state: Arc::clone(&state),
draining_dispatcher: Arc::clone(&dispatcher),
});
runtime_handle.spawn(async move {
let coord = RevisionDrainCoordinator::with_noop_ws(dispatcher, teardown)
.with_liveness_probe(liveness);
let req = DrainRequest {
tenant: tenant.as_str(),
deployment_id,
bundle_id,
revision_id,
drain_seconds,
};
if let Err(err) = coord.run(req).await {
operator_log::warn(
module_path!(),
format!(
"drain coordinator for revision {revision_id} of \
deployment {deployment_id} returned an error: {err}"
),
);
}
});
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) struct ReloadReport {
pub prev_deployments: usize,
pub prev_revisions: usize,
pub new_deployments: usize,
pub new_revisions: usize,
}
pub(crate) struct RevisionServer {
shutdown: Option<oneshot::Sender<()>>,
handle: Option<JoinHandle<Result<()>>>,
actual_port: u16,
state: Arc<ServeState>,
runtime_handle: Handle,
reload_lock: std::sync::Mutex<()>,
generation_watermark: std::sync::Mutex<HashMap<DeploymentId, u64>>,
}
impl RevisionServer {
pub(crate) fn start(config: RevisionServeConfig) -> Result<Self> {
let requested_port = config.bind_addr.port();
let listen_ip = config.bind_addr.ip();
let actual_port =
crate::port_utils::find_available_port(&listen_ip.to_string(), requested_port, 10)
.context("failed to find available port for revision ingress")?;
if actual_port != requested_port {
operator_log::warn(
module_path!(),
format!(
"requested port {requested_port} is in use; using port {actual_port} instead"
),
);
}
let addr = SocketAddr::new(listen_ip, actual_port);
let state = Arc::new(ServeState {
slot: ArcSwap::new(config.activation),
bound_addr: addr,
});
let listener_state = Arc::clone(&state);
let (tx, rx) = oneshot::channel();
let (startup_tx, startup_rx) = mpsc::channel::<Result<Handle>>();
let handle = thread::Builder::new()
.name("revision-ingress".to_string())
.spawn(move || -> Result<()> {
let runtime =
match Runtime::new().context("failed to create revision ingress runtime") {
Ok(runtime) => runtime,
Err(err) => {
let _ = startup_tx.send(Err(anyhow::anyhow!("{err:#}")));
return Err(err);
}
};
let runtime_handle = runtime.handle().clone();
runtime.block_on(async move {
let listener = match TcpListener::bind(addr)
.await
.context("failed to bind revision ingress listener")
{
Ok(listener) => listener,
Err(err) => {
let _ = startup_tx.send(Err(anyhow::anyhow!("{err:#}")));
return Err(err);
}
};
let _ = startup_tx.send(Ok(runtime_handle));
operator_log::info(
module_path!(),
format!("revision ingress listening on http://{addr}"),
);
let mut shutdown = rx;
loop {
tokio::select! {
_ = &mut shutdown => break,
accept = listener.accept() => match accept {
Ok((stream, peer)) => {
let connection_state = listener_state.clone();
let peer_is_loopback = peer.ip().to_canonical().is_loopback();
tokio::spawn(async move {
let service = service_fn(move |req| {
handle_connection(
req,
connection_state.clone(),
peer_is_loopback,
)
});
let io = TokioIo::new(stream);
if let Err(err) =
Http1Builder::new().serve_connection(io, service).await
{
operator_log::error(
module_path!(),
format!("revision ingress connection error: {err}"),
);
}
});
}
Err(err) => operator_log::error(
module_path!(),
format!("revision ingress accept error: {err}"),
),
},
}
}
Ok(())
})
})?;
let runtime_handle = startup_rx
.recv()
.context("failed to receive revision ingress startup result")??;
let mut initial_watermark: HashMap<DeploymentId, u64> = HashMap::new();
state
.slot
.load()
.routing
.dispatcher
.absorb_into_watermark(&mut initial_watermark);
Ok(Self {
shutdown: Some(tx),
handle: Some(handle),
actual_port,
state,
runtime_handle,
reload_lock: std::sync::Mutex::new(()),
generation_watermark: std::sync::Mutex::new(initial_watermark),
})
}
pub(crate) fn actual_port(&self) -> u16 {
self.actual_port
}
pub(crate) fn counts(&self) -> (usize, usize) {
self.state.slot.load().routing.dispatcher.counts()
}
pub(crate) fn reload(&self, new: Activation, drain_window: Duration) -> ReloadReport {
let _reload_guard = self.reload_lock.lock().expect("reload lock poisoned");
let new_arc = Arc::new(new);
let prev = self.state.slot.load_full();
debug_assert!(
!Arc::ptr_eq(&prev.routing.dispatcher, &new_arc.routing.dispatcher),
"reload must build a fresh dispatcher Arc (SlotLivenessProbe ptr_eq guard depends on it)"
);
{
let mut watermark = self
.generation_watermark
.lock()
.expect("generation watermark lock poisoned");
prev.routing
.dispatcher
.absorb_into_watermark(&mut watermark);
new_arc
.routing
.dispatcher
.bump_generations_from_watermark(&watermark);
new_arc
.routing
.dispatcher
.absorb_into_watermark(&mut watermark);
}
let removed = removed_revisions(&prev.routing.dispatcher, &new_arc.routing.dispatcher);
let (new_deployments, new_revisions) = new_arc.routing.dispatcher.counts();
let prev = self.state.slot.swap(new_arc);
let (prev_deployments, prev_revisions) = prev.routing.dispatcher.counts();
if !removed.is_empty() && !drain_window.is_zero() {
spawn_revision_drains(
&self.runtime_handle,
Arc::clone(&self.state),
Arc::clone(&prev),
removed,
drain_window,
);
}
if drain_window.is_zero() {
drop(prev);
} else {
self.runtime_handle.spawn(async move {
tokio::time::sleep(drain_window).await;
drop(prev);
});
}
ReloadReport {
prev_deployments,
prev_revisions,
new_deployments,
new_revisions,
}
}
pub(crate) fn stop(mut self) -> Result<()> {
if let Some(tx) = self.shutdown.take() {
let _ = tx.send(());
}
if let Some(handle) = self.handle.take() {
handle
.join()
.map_err(|err| anyhow::anyhow!("revision ingress server panicked: {err:?}"))??;
}
Ok(())
}
}
async fn handle_connection(
req: Request<Incoming>,
state: Arc<ServeState>,
peer_is_loopback: bool,
) -> Result<Response<Full<Bytes>>, Infallible> {
Ok(match serve(req, state, peer_is_loopback).await {
Ok(response) => response,
Err(response) => response,
})
}
async fn serve(
req: Request<Incoming>,
state: Arc<ServeState>,
peer_is_loopback: bool,
) -> Result<Response<Full<Bytes>>, Response<Full<Bytes>>> {
let method = req.method().clone();
let path = req.uri().path().to_string();
if let Some(response) = try_probe_response(&path, &state) {
return Ok(response);
}
if path == "/workers/invoke" {
if method != hyper::Method::POST {
return Err(error_response(
StatusCode::METHOD_NOT_ALLOWED,
"worker invoke requires POST",
));
}
return handle_worker_invoke(req, Arc::clone(&state), peer_is_loopback).await;
}
let activation = state.current();
let host_header = header_str(req.headers(), header::HOST.as_str());
let cookie_header = header_str(req.headers(), header::COOKIE.as_str());
let user_header = header_str(req.headers(), "x-greentic-user");
let session_header = header_str(req.headers(), "x-greentic-session");
let endpoint_header = header_str(req.headers(), "x-greentic-messaging-endpoint-id");
let identify_headers = identify_payload::collect_identify_headers(req.headers());
let request_headers = collect_forwarded_request_headers(req.headers());
let query_string = req.uri().query().map(str::to_string);
let (deployment_id, tenant) = activation
.routing
.deployment_routes
.resolve(host_header.as_deref(), &path)
.map(|(deployment_id, tenant)| (deployment_id, tenant.to_string()))
.ok_or_else(|| {
error_response(
StatusCode::NOT_FOUND,
"no deployment is bound to this host and path",
)
})?;
let body_bytes = read_body_limited(req).await.map_err(|_| {
error_response(
StatusCode::PAYLOAD_TOO_LARGE,
"request body exceeds the size limit",
)
})?;
let identity_payload: Value = if peer_is_loopback && !body_bytes.is_empty() {
serde_json::from_slice(&body_bytes).unwrap_or(Value::Null)
} else {
Value::Null
};
let (user, session_hint, header_endpoint_id) = caller_identity(
peer_is_loopback,
user_header,
session_header,
endpoint_header,
&identity_payload,
);
resolve_endpoint_admission(
header_endpoint_id.as_deref(),
activation.routing.endpoint_admit.as_ref(),
)
.map_err(|boxed| *boxed)?;
let cookie_value = cookie_header
.as_deref()
.and_then(|jar| read_cookie(jar, &cookie_name(deployment_id)));
let dispatch_req = DispatchRequest {
env_id: activation.routing.dispatcher.env_id(),
tenant: &tenant,
deployment_id,
session_hint: session_hint.as_deref(),
trusted: false,
header_revision: None,
cookie: cookie_value.as_deref(),
};
let mut rng: rand::rngs::SmallRng = rand::make_rng();
let outcome = activation
.routing
.dispatcher
.dispatch(&dispatch_req, &mut rng)
.await
.map_err(|err| {
operator_log::warn(
module_path!(),
format!("revision dispatch for deployment {deployment_id} failed: {err:#}"),
);
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"revision dispatch failed",
)
})?;
let scope = RevisionScope {
deployment_id,
bundle_id: outcome.bundle_id.clone(),
revision_id: outcome.revision_id,
};
match admit_request(&activation.routing.http_routes, &scope, &path, &method) {
Admission::ProviderRoute => {
return dispatch_provider_route(
Arc::clone(&activation),
&tenant,
&scope,
&path,
method.as_str(),
query_string.as_deref(),
&request_headers,
&body_bytes,
peer_is_loopback,
&identify_headers,
header_endpoint_id.as_deref(),
)
.await;
}
Admission::MethodNotAllowed => {
return Err(error_response(
StatusCode::METHOD_NOT_ALLOWED,
"only POST is supported for the generic revision ingress",
));
}
Admission::Serve => {}
}
let payload: Value = if body_bytes.is_empty() {
Value::Null
} else {
serde_json::from_slice(&body_bytes)
.map_err(|_| error_response(StatusCode::BAD_REQUEST, "request body must be JSON"))?
};
let (endpoint_id, welcome_hint) = resolve_endpoint_for_scope(
&activation,
&tenant,
&scope,
header_endpoint_id.as_deref(),
peer_is_loopback,
|| (identify_headers.clone(), payload.clone()),
)
.await?;
let activity = build_activity(
&payload,
&tenant,
user.as_deref(),
session_hint.as_deref(),
endpoint_id.as_deref(),
welcome_hint,
);
let replies = activation
.host
.handle_activity_for_revision(
&tenant,
deployment_id,
outcome.bundle_id.clone(),
outcome.revision_id,
activity,
)
.await
.map_err(|err| {
operator_log::error(
module_path!(),
format!(
"revision execution failed for deployment {deployment_id} revision {}: {err:#}",
outcome.revision_id
),
);
error_response(StatusCode::INTERNAL_SERVER_ERROR, "flow execution failed")
})?;
let body = serde_json::to_vec(&replies)
.map_err(|err| error_response(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
let mut response = json_response(StatusCode::OK, body);
if let Some(directive) = outcome.set_cookie {
apply_set_cookie(&mut response, &directive);
}
Ok(response)
}
#[derive(serde::Deserialize)]
struct WorkerInvokeRequest {
#[serde(default)]
version: String,
tenant: greentic_types::TenantCtx,
#[serde(default)]
worker_id: String,
#[serde(default)]
payload: Value,
#[serde(default)]
correlation_id: Option<String>,
#[serde(default)]
session_id: Option<String>,
#[serde(default)]
thread_id: Option<String>,
}
#[derive(serde::Serialize)]
struct WorkerInvokeMessage {
kind: String,
payload: Value,
}
#[derive(serde::Serialize)]
struct WorkerInvokeResponse {
version: String,
tenant: greentic_types::TenantCtx,
worker_id: String,
timestamp_utc: String,
messages: Vec<WorkerInvokeMessage>,
correlation_id: Option<String>,
session_id: Option<String>,
thread_id: Option<String>,
}
async fn handle_worker_invoke(
req: Request<Incoming>,
state: Arc<ServeState>,
peer_is_loopback: bool,
) -> Result<Response<Full<Bytes>>, Response<Full<Bytes>>> {
if !peer_is_loopback {
return Err(error_response(
StatusCode::FORBIDDEN,
"worker invoke is restricted to loopback callers",
));
}
let activation = state.current();
let body_bytes = read_body_limited(req).await.map_err(|_| {
error_response(
StatusCode::PAYLOAD_TOO_LARGE,
"request body exceeds the size limit",
)
})?;
let worker_req: WorkerInvokeRequest = serde_json::from_slice(&body_bytes).map_err(|err| {
error_response(
StatusCode::BAD_REQUEST,
format!("invalid HostWorkerRequest body: {err}"),
)
})?;
let (deployment_id, tenant) = activation
.routing
.deployment_routes
.resolve_worker(worker_req.tenant.tenant_id.as_str())
.map(|(id, tenant)| (id, tenant.to_string()))
.ok_or_else(|| {
error_response(
StatusCode::NOT_FOUND,
"no active deployment resolves for the requested tenant",
)
})?;
let session_hint = worker_req.session_id.clone();
let dispatch_req = DispatchRequest {
env_id: activation.routing.dispatcher.env_id(),
tenant: &tenant,
deployment_id,
session_hint: session_hint.as_deref(),
trusted: false,
header_revision: None,
cookie: None,
};
let mut rng: rand::rngs::SmallRng = rand::make_rng();
let outcome = activation
.routing
.dispatcher
.dispatch(&dispatch_req, &mut rng)
.await
.map_err(|err| {
operator_log::warn(
module_path!(),
format!("worker-invoke dispatch for deployment {deployment_id} failed: {err:#}"),
);
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"revision dispatch failed",
)
})?;
let user = worker_req
.tenant
.user_id
.as_ref()
.map(|u| u.as_str().to_string());
let flow_payload = normalize_worker_payload(&worker_req.payload);
let activity = build_activity(
&flow_payload,
&tenant,
user.as_deref(),
session_hint.as_deref(),
None,
None,
);
let replies = activation
.host
.handle_activity_for_revision(
&tenant,
deployment_id,
outcome.bundle_id.clone(),
outcome.revision_id,
activity,
)
.await
.map_err(|err| {
operator_log::error(
module_path!(),
format!(
"worker-invoke execution failed for deployment {deployment_id} revision {}: {err:#}",
outcome.revision_id
),
);
error_response(StatusCode::INTERNAL_SERVER_ERROR, "flow execution failed")
})?;
let messages = replies.iter().map(activity_to_worker_message).collect();
let response = WorkerInvokeResponse {
version: if worker_req.version.is_empty() {
"1.0.0".to_string()
} else {
worker_req.version
},
tenant: worker_req.tenant,
worker_id: worker_req.worker_id,
timestamp_utc: chrono::Utc::now().to_rfc3339(),
messages,
correlation_id: worker_req.correlation_id,
session_id: worker_req.session_id,
thread_id: worker_req.thread_id,
};
let body = serde_json::to_vec(&response)
.map_err(|err| error_response(StatusCode::INTERNAL_SERVER_ERROR, err.to_string()))?;
Ok(json_response(StatusCode::OK, body))
}
fn activity_to_worker_message(activity: &Activity) -> WorkerInvokeMessage {
let raw = activity.payload();
let payload = match raw.get("status").and_then(Value::as_str) {
Some("pending") => raw.get("response").unwrap_or(raw),
_ => raw,
};
if let Some(card) = payload.get("renderedCard") {
WorkerInvokeMessage {
kind: "adaptive-card".to_string(),
payload: card.clone(),
}
} else if payload.get("text").is_some() {
WorkerInvokeMessage {
kind: "text".to_string(),
payload: payload.clone(),
}
} else {
WorkerInvokeMessage {
kind: "activity".to_string(),
payload: payload.clone(),
}
}
}
fn normalize_worker_payload(payload: &Value) -> Value {
match payload {
Value::Object(map) if !map.is_empty() && !map.get("text").is_some_and(Value::is_string) => {
serde_json::json!({ "metadata": payload.clone() })
}
_ => payload.clone(),
}
}
fn build_activity(
payload: &Value,
tenant: &str,
user: Option<&str>,
session: Option<&str>,
endpoint: Option<&str>,
welcome_hint: Option<WelcomeFlowHint>,
) -> Activity {
let mut activity = match payload.get("text").and_then(Value::as_str) {
Some(text) => Activity::text(text),
None => Activity::custom("http.request", payload.clone()),
};
activity = activity.with_tenant(tenant);
if let Some(user) = user {
activity = activity.from_user(user);
}
if let Some(session) = session {
activity = activity.with_session(session);
}
if let Some(endpoint) = endpoint {
activity = activity.with_messaging_endpoint(endpoint);
}
if let Some(hint) = welcome_hint {
activity = activity.with_welcome_flow_hint(hint);
}
activity
}
fn resolve_welcome_flow_hint(
endpoint_id: Option<&str>,
dispatched_bundle: &BundleId,
admit: &crate::endpoint_admit::EndpointAdmit,
) -> Option<WelcomeFlowHint> {
endpoint_id
.and_then(|eid| admit.welcome_flow_for_bundle(eid, dispatched_bundle))
.map(|ref_| WelcomeFlowHint {
pack_id: ref_.pack_id.as_str().to_string(),
flow_id: ref_.flow_id.clone(),
})
}
fn caller_identity(
peer_is_loopback: bool,
user_header: Option<String>,
session_header: Option<String>,
endpoint_header: Option<String>,
payload: &Value,
) -> (Option<String>, Option<String>, Option<String>) {
if !peer_is_loopback {
return (None, None, None);
}
let user = user_header.or_else(|| str_field(payload, "user"));
let session = session_header.or_else(|| str_field(payload, "session"));
let endpoint = endpoint_header.and_then(validate_endpoint_id);
(user, session, endpoint)
}
fn validate_endpoint_id(raw: String) -> Option<String> {
if raw.is_empty() || raw.len() > 128 {
return None;
}
if !raw
.bytes()
.all(|b| b.is_ascii_alphanumeric() || matches!(b, b'-' | b'_' | b'.'))
{
return None;
}
Some(raw)
}
#[derive(Debug)]
enum EndpointAdmission<'a> {
NotAsserted,
Resolved(&'a std::collections::HashSet<String>),
}
fn resolve_endpoint_admission<'a>(
endpoint_id: Option<&str>,
admit: &'a crate::endpoint_admit::EndpointAdmit,
) -> Result<EndpointAdmission<'a>, Box<Response<Full<Bytes>>>> {
let Some(eid) = endpoint_id else {
return Ok(EndpointAdmission::NotAsserted);
};
match admit.linked_bundles(eid) {
Some(acl) => Ok(EndpointAdmission::Resolved(acl)),
None => Err(Box::new(error_response(
StatusCode::UNAUTHORIZED,
"messaging endpoint not recognized in this environment",
))),
}
}
fn check_bundle_admission(
admission: &EndpointAdmission<'_>,
bundle_id: &str,
) -> Result<(), Box<Response<Full<Bytes>>>> {
let EndpointAdmission::Resolved(acl) = admission else {
return Ok(());
};
if acl.contains(bundle_id) {
Ok(())
} else {
Err(Box::new(error_response(
StatusCode::FORBIDDEN,
"this messaging endpoint is not authorized to route to the resolved bundle",
)))
}
}
#[derive(Debug, PartialEq, Eq)]
enum Admission {
Serve,
ProviderRoute,
MethodNotAllowed,
}
fn admit_request(
routes: &HttpRouteTable,
scope: &RevisionScope,
path: &str,
method: &hyper::Method,
) -> Admission {
if routes
.match_request_for_revision(path, method.as_str(), scope)
.is_some()
{
return Admission::ProviderRoute;
}
if method != hyper::Method::POST {
return Admission::MethodNotAllowed;
}
Admission::Serve
}
async fn read_body_limited(req: Request<Incoming>) -> Result<Bytes, ()> {
Limited::new(req.into_body(), MAX_BODY_BYTES)
.collect()
.await
.map(|collected| collected.to_bytes())
.map_err(|_| ())
}
fn header_str(headers: &header::HeaderMap, name: &str) -> Option<String> {
headers
.get(name)
.and_then(|value| value.to_str().ok())
.map(|value| value.to_string())
}
fn str_field(payload: &Value, key: &str) -> Option<String> {
payload
.get(key)
.and_then(Value::as_str)
.map(|value| value.to_string())
}
fn read_cookie(jar: &str, name: &str) -> Option<String> {
jar.split(';').find_map(|pair| {
let (key, value) = pair.split_once('=')?;
(key.trim() == name).then(|| value.trim().to_string())
})
}
fn apply_set_cookie(response: &mut Response<Full<Bytes>>, directive: &SetCookieDirective) {
let header_value = directive.to_header_value();
match header::HeaderValue::from_str(&header_value) {
Ok(value) => {
response.headers_mut().append(header::SET_COOKIE, value);
}
Err(err) => operator_log::warn(
module_path!(),
format!(
"failed to encode revision Set-Cookie `{}`: {err}",
directive.name
),
),
}
}
fn json_response(status: StatusCode, body: Vec<u8>) -> Response<Full<Bytes>> {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "application/json")
.body(Full::new(Bytes::from(body)))
.expect("static response builder inputs are valid")
}
fn text_response(status: StatusCode, body: &str) -> Response<Full<Bytes>> {
Response::builder()
.status(status)
.header(header::CONTENT_TYPE, "text/plain; charset=utf-8")
.body(Full::new(Bytes::from(body.to_string())))
.expect("static response builder inputs are valid")
}
pub(crate) fn error_response(
status: StatusCode,
message: impl AsRef<str>,
) -> Response<Full<Bytes>> {
text_response(status, message.as_ref())
}
fn try_probe_response(path: &str, state: &ServeState) -> Option<Response<Full<Bytes>>> {
if matches!(path, "/livez" | "/readyz" | "/healthz" | "/health") {
return Some(text_response(StatusCode::OK, "ok"));
}
if path == "/status" {
let activation = state.current();
let (deployments_routed, revisions_active) = activation.routing.dispatcher.counts();
let body = serde_json::json!({
"schema": "greentic.status.v1",
"env_id": activation.routing.dispatcher.env_id(),
"listen_addr": state.bound_addr.to_string(),
"bundles_active": activation.routing.deployment_routes.len(),
"deployments_routed": deployments_routed,
"revisions_active": revisions_active,
});
return Some(json_response(StatusCode::OK, body.to_string().into_bytes()));
}
None
}
pub(crate) fn resolve_bind_addr(host_config: Option<&EnvironmentHostConfig>) -> SocketAddr {
let mut addr = host_config
.map(EnvironmentHostConfig::resolved_listen_addr)
.unwrap_or(DEFAULT_LISTEN_ADDR);
if let Ok(raw) = std::env::var("GREENTIC_GATEWAY_LISTEN_ADDR") {
let trimmed = raw.trim();
if !trimmed.is_empty() {
if let Ok(sa) = trimmed.parse::<SocketAddr>() {
addr = sa;
} else if let Ok(ip) = trimmed.parse::<IpAddr>() {
addr = SocketAddr::new(ip, addr.port());
} else {
operator_log::warn(
module_path!(),
format!(
"GREENTIC_GATEWAY_LISTEN_ADDR={trimmed:?} is not a valid SocketAddr or IP; \
falling back to {addr}"
),
);
}
}
}
if let Ok(raw) = std::env::var("PORT") {
let trimmed = raw.trim();
if !trimmed.is_empty() {
if let Ok(port) = trimmed.parse::<u16>() {
addr.set_port(port);
} else {
operator_log::warn(
module_path!(),
format!(
"PORT={trimmed:?} is not a valid u16; keeping port {}",
addr.port()
),
);
}
}
}
addr
}
async fn resolve_endpoint_for_scope<F>(
activation: &Activation,
tenant: &str,
scope: &RevisionScope,
header_endpoint_id: Option<&str>,
peer_is_loopback: bool,
build_headers_body: F,
) -> Result<(Option<String>, Option<WelcomeFlowHint>), Response<Full<Bytes>>>
where
F: FnOnce() -> (Vec<(String, String)>, Value),
{
let resolution = endpoint_resolver::resolve(
&activation.host,
tenant,
scope,
activation.routing.endpoint_admit.as_ref(),
header_endpoint_id,
peer_is_loopback,
build_headers_body,
)
.await
.map_err(|err| {
operator_log::warn(
module_path!(),
format!(
"messaging-endpoint resolver failed for deployment {} revision {}: {err:#}",
scope.deployment_id, scope.revision_id,
),
);
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"messaging-endpoint resolution failed",
)
})?;
tracing::info!(
target: "greentic_start::endpoint_resolver",
endpoint_resolution = resolution.origin(),
messaging_endpoint_id = resolution.endpoint_id().unwrap_or(""),
"messaging-endpoint resolution outcome",
);
if matches!(resolution, endpoint_resolver::ResolverOutcome::Ambiguous) {
return Err(error_response(
StatusCode::UNPROCESSABLE_ENTITY,
"messaging endpoint resolution is ambiguous; assert the endpoint via \
x-greentic-messaging-endpoint-id",
));
}
let endpoint_id: Option<String> = resolution.endpoint_id().map(str::to_string);
let admission = resolve_endpoint_admission(
endpoint_id.as_deref(),
activation.routing.endpoint_admit.as_ref(),
)
.map_err(|boxed| *boxed)?;
check_bundle_admission(&admission, scope.bundle_id.as_str()).map_err(|boxed| *boxed)?;
let welcome_hint = resolve_welcome_flow_hint(
endpoint_id.as_deref(),
&scope.bundle_id,
&activation.routing.endpoint_admit,
);
Ok((endpoint_id, welcome_hint))
}
#[allow(clippy::too_many_arguments)]
async fn dispatch_provider_route(
activation: Arc<Activation>,
tenant: &str,
scope: &RevisionScope,
path: &str,
method: &str,
query: Option<&str>,
request_headers: &[(String, String)],
body: &[u8],
peer_is_loopback: bool,
identify_headers: &[(String, String)],
header_endpoint_id: Option<&str>,
) -> Result<Response<Full<Bytes>>, Response<Full<Bytes>>> {
let Some(route_match) = activation
.routing
.http_routes
.match_request_for_revision(path, method, scope)
else {
return Err(error_response(
StatusCode::INTERNAL_SERVER_ERROR,
"provider route disappeared between admit and dispatch",
));
};
let Some(provider_type) = route_match.descriptor.provider_type.clone() else {
return Err(error_response(
StatusCode::NOT_IMPLEMENTED,
"this path is a legacy http-routes.v1 route; Phase D.3 only \
handles greentic.provider-extension.v1 synthesized routes",
));
};
let descriptor_pack_id = route_match.descriptor.pack_id.clone();
let provider_op = route_match.descriptor.provider_op.clone();
let deployment_id = scope.deployment_id;
let bundle_id = scope.bundle_id.clone();
let revision_id = scope.revision_id;
let secrets = activation.host.secrets_manager();
let header_endpoint_id_authenticated = match provider_auth::authenticate_provider_webhook(
activation.routing.endpoint_admit.as_ref(),
&secrets,
scope,
&provider_type,
request_headers,
)
.await
{
Ok(provider_auth::AuthOutcome::Authenticated(eid)) => Some(eid),
Ok(provider_auth::AuthOutcome::Skipped) => None,
Err(response) => return Err(response),
};
let header_endpoint_id_for_resolver = header_endpoint_id_authenticated
.as_deref()
.or(header_endpoint_id);
let (endpoint_id, welcome_hint) = resolve_endpoint_for_scope(
&activation,
tenant,
scope,
header_endpoint_id_for_resolver,
peer_is_loopback,
|| {
let body_value: Value = serde_json::from_slice(body).unwrap_or(Value::Null);
(identify_headers.to_vec(), body_value)
},
)
.await?;
let http_in = build_provider_http_in(
&provider_type,
tenant,
method,
path,
query,
request_headers,
body,
);
let input_json = serde_json::to_vec(&http_in).map_err(|err| {
error_response(
StatusCode::INTERNAL_SERVER_ERROR,
format!("encode HttpInV1 for provider {provider_type}: {err}"),
)
})?;
let output = activation
.host
.invoke_provider_for_revision(
tenant,
deployment_id,
bundle_id.clone(),
revision_id,
&provider_type,
&provider_op,
input_json,
None,
None,
)
.await
.map_err(|err| {
operator_log::error(
module_path!(),
format!(
"provider {provider_type} op {provider_op} failed for \
deployment {deployment_id} revision {revision_id}: {err:#}"
),
);
error_response(StatusCode::BAD_GATEWAY, "provider invocation failed")
})?;
let result = parse_dispatch_result(&output).map_err(|err| {
operator_log::warn(
module_path!(),
format!(
"provider {provider_type} op {provider_op} returned an undecodable \
envelope (deployment {deployment_id} revision {revision_id}): {err:#}"
),
);
error_response(
StatusCode::BAD_GATEWAY,
"could not decode provider response envelope",
)
})?;
if !result.events.is_empty() {
operator_log::warn(
module_path!(),
format!(
"provider {provider_type} emitted {} event(s) on op {provider_op} \
(deployment {deployment_id} revision {revision_id}); revision-aware \
event routing is Phase D.4 work — events dropped for now",
result.events.len()
),
);
}
if !result.messaging_envelopes.is_empty() {
let ingress_envelopes = result.messaging_envelopes.clone();
let pipeline_activation = Arc::clone(&activation);
let pipeline_tenant = tenant.to_string();
let pipeline_provider = provider_type.clone();
let pipeline_bundle = bundle_id.clone();
tokio::spawn(async move {
run_provider_inbound_pipeline(
pipeline_activation,
pipeline_tenant,
deployment_id,
pipeline_bundle,
revision_id,
descriptor_pack_id,
pipeline_provider,
ingress_envelopes,
endpoint_id,
welcome_hint,
)
.await;
});
}
Ok(synthesize_provider_response(&result.response))
}
#[allow(clippy::too_many_arguments)]
async fn run_provider_inbound_pipeline(
activation: Arc<Activation>,
tenant: String,
deployment_id: DeploymentId,
bundle_id: BundleId,
revision_id: RevisionId,
pack_id: String,
provider_type: String,
envelopes: Vec<ChannelMessageEnvelope>,
endpoint_id: Option<String>,
welcome_hint: Option<WelcomeFlowHint>,
) {
for ingress in &envelopes {
let activity = envelope_to_activity(
ingress,
&tenant,
endpoint_id.as_deref(),
welcome_hint.clone(),
);
let replies = match activation
.host
.handle_activity_for_revision(
&tenant,
deployment_id,
bundle_id.clone(),
revision_id,
activity,
)
.await
{
Ok(replies) => replies,
Err(err) => {
operator_log::error(
module_path!(),
format!(
"forwarding provider event to flow runtime failed for \
deployment {deployment_id} revision {revision_id}: {err:#}"
),
);
continue;
}
};
for reply in replies {
let reply_envelope = build_reply_envelope(ingress, &reply);
if let Err(err) = run_reply_egress(
&activation,
&tenant,
deployment_id,
bundle_id.clone(),
revision_id,
&pack_id,
&provider_type,
&reply_envelope,
)
.await
{
operator_log::error(
module_path!(),
format!(
"provider {provider_type} egress failed for deployment \
{deployment_id} revision {revision_id} (reply id={}): {err:#}",
reply_envelope.id
),
);
}
}
}
}
#[allow(clippy::too_many_arguments)]
async fn run_reply_egress(
activation: &Activation,
tenant: &str,
deployment_id: DeploymentId,
bundle_id: BundleId,
revision_id: RevisionId,
pack_id: &str,
provider_type: &str,
envelope: &ChannelMessageEnvelope,
) -> Result<()> {
use crate::messaging_dto::{EncodeInV1, ProviderPayloadV1, RenderPlanInV1};
let message_value = serde_json::to_value(envelope).context("serialize reply envelope")?;
let invoke = async |op: &'static str, input: Value| -> Result<Value> {
let input_bytes =
serde_json::to_vec(&input).with_context(|| format!("encode {op} input"))?;
activation
.host
.invoke_provider_for_revision(
tenant,
deployment_id,
bundle_id.clone(),
revision_id,
provider_type,
op,
input_bytes,
None,
None,
)
.await
.with_context(|| format!("{op} invocation"))
};
let render_input = serde_json::to_value(RenderPlanInV1 {
v: 1,
message: message_value.clone(),
})
.context("encode RenderPlanInV1")?;
let plan_value = invoke("render_plan", render_input).await?;
let plan = plan_value
.get("plan_json")
.and_then(|v| v.as_str())
.and_then(|s| serde_json::from_str::<Value>(s).ok())
.or_else(|| plan_value.get("plan").cloned())
.unwrap_or(plan_value);
let encode_input = serde_json::to_value(EncodeInV1 {
v: 1,
message: message_value,
plan,
})
.context("encode EncodeInV1")?;
let encode_value = invoke("encode", encode_input).await?;
let payload_value = encode_value.get("payload").cloned().unwrap_or(encode_value);
let payload: ProviderPayloadV1 =
serde_json::from_value(payload_value).context("decode ProviderPayloadV1")?;
let pack_overrides = crate::messaging_egress::pack_config_overrides_as_json(
&activation.routing.deployment_config_overrides,
deployment_id,
pack_id,
);
let send = crate::messaging_egress::build_send_payload(
payload,
provider_type.to_string(),
tenant.to_string(),
None,
pack_overrides,
);
let send_input = serde_json::to_value(&send).context("encode SendPayloadInV1")?;
let send_outcome = invoke("send_payload", send_input).await?;
if send_outcome
.get("ok")
.and_then(|v| v.as_bool())
.is_some_and(|ok| !ok)
{
let error_msg = send_outcome
.get("error")
.and_then(|v| v.as_str())
.unwrap_or("send_payload reported ok=false");
anyhow::bail!("{error_msg}");
}
Ok(())
}
fn build_reply_envelope(
ingress: &ChannelMessageEnvelope,
reply: &Activity,
) -> ChannelMessageEnvelope {
const REPLY_METADATA_KEYS: &[&str] = &[
"env",
"tenant",
"team",
"route",
"locale",
"universal",
"autoStart",
];
if let Ok(mut envelope) =
serde_json::from_value::<ChannelMessageEnvelope>(reply.payload().clone())
{
if envelope.session_id.trim().is_empty() {
envelope.session_id = ingress.session_id.clone();
}
if envelope.channel.is_empty() {
envelope.channel = ingress.channel.clone();
}
if envelope.to.is_empty() {
envelope.to = ingress.to.clone();
}
if envelope.id.is_empty() {
envelope.id = uuid::Uuid::new_v4().to_string();
}
return envelope;
}
let mut envelope = ingress.clone();
envelope.id = uuid::Uuid::new_v4().to_string();
envelope.from = None;
envelope.correlation_id = None;
envelope.reply_scope = None;
envelope.text = None;
envelope.attachments.clear();
let mut clean = std::collections::BTreeMap::new();
for key in REPLY_METADATA_KEYS {
if let Some(value) = envelope.metadata.remove(*key) {
clean.insert((*key).to_string(), value);
}
}
envelope.metadata = clean;
let message = activity_to_worker_message(reply);
match message.kind.as_str() {
"adaptive-card" => {
if let Ok(ac_json) = serde_json::to_string(&message.payload) {
envelope
.metadata
.insert("adaptive_card".to_string(), ac_json);
}
envelope
.extensions
.insert(ext_keys::ADAPTIVE_CARD.to_string(), message.payload);
}
"text" => {
if let Some(text) = message
.payload
.get("text")
.and_then(|v| v.as_str())
.filter(|s| !s.is_empty())
{
envelope.text = Some(text.to_string());
}
}
_ => {}
}
envelope
}
fn collect_forwarded_request_headers(headers: &HeaderMap) -> Vec<(String, String)> {
headers
.iter()
.filter_map(|(name, value)| {
Some((
name.as_str().to_ascii_lowercase(),
value.to_str().ok()?.to_string(),
))
})
.collect()
}
fn build_provider_http_in(
provider: &str,
tenant: &str,
method: &str,
path: &str,
query: Option<&str>,
headers: &[(String, String)],
body: &[u8],
) -> HttpInV1 {
HttpInV1 {
v: 1,
provider: provider.to_string(),
route: None,
binding_id: None,
tenant_hint: Some(tenant.to_string()),
team_hint: None,
method: method.to_string(),
path: path.to_string(),
query: parse_query_pairs(query),
headers: headers.to_vec(),
body_b64: BASE64.encode(body),
config: None,
}
}
fn parse_query_pairs(query: Option<&str>) -> Vec<(String, String)> {
query
.unwrap_or_default()
.split('&')
.filter(|s| !s.is_empty())
.map(|pair| match pair.split_once('=') {
Some((k, v)) => (k.to_string(), v.to_string()),
None => (pair.to_string(), String::new()),
})
.collect()
}
fn envelope_to_activity(
envelope: &ChannelMessageEnvelope,
fallback_tenant: &str,
endpoint_id: Option<&str>,
welcome_hint: Option<WelcomeFlowHint>,
) -> Activity {
let payload = serde_json::to_value(envelope).unwrap_or(Value::Null);
let mut activity = Activity::custom("provider.event", payload).with_flow_type("messaging");
let envelope_tenant = envelope.tenant.tenant_id.as_str();
let tenant = if envelope_tenant.is_empty() {
fallback_tenant
} else {
envelope_tenant
};
activity = activity.with_tenant(tenant);
if !envelope.session_id.is_empty() {
activity = activity.with_session(&envelope.session_id);
}
if let Some(from) = envelope.from.as_ref()
&& !from.id.is_empty()
{
activity = activity.from_user(&from.id);
}
if let Some(eid) = endpoint_id {
activity = activity.with_messaging_endpoint(eid);
}
if let Some(hint) = welcome_hint {
activity = activity.with_welcome_flow_hint(hint);
}
activity
}
fn synthesize_provider_response(response: &IngressHttpResponse) -> Response<Full<Bytes>> {
let status = StatusCode::from_u16(response.status).unwrap_or(StatusCode::OK);
let body = response.body.clone().map(Bytes::from).unwrap_or_default();
let mut builder = Response::builder().status(status);
for (name, value) in &response.headers {
if let Ok(header_name) = hyper::header::HeaderName::try_from(name.as_str())
&& let Ok(header_value) = hyper::header::HeaderValue::try_from(value.as_str())
{
builder = builder.header(header_name, header_value);
}
}
builder
.body(Full::new(body))
.unwrap_or_else(|_| error_response(StatusCode::BAD_GATEWAY, "invalid provider response"))
}
#[cfg(test)]
mod tests {
use super::*;
use greentic_deploy_spec::WelcomeFlowRef;
use greentic_deploy_spec::ids::BundleId;
use greentic_runner_host::engine::runtime::{FlowResumeStore, IngressEnvelope};
use greentic_runner_host::runner::engine::{ExecutionState, FlowSnapshot, FlowWait};
use greentic_runner_host::storage::new_session_store;
use greentic_types::ReplyScope;
use serde_json::json;
#[test]
fn build_activity_text_field_becomes_messaging_activity() {
let payload = json!({ "text": "hello there" });
let activity = build_activity(&payload, "acme", Some("u1"), Some("s1"), None, None);
assert_eq!(activity.tenant(), Some("acme"));
assert_eq!(activity.user(), Some("u1"));
assert_eq!(activity.session_id(), Some("s1"));
assert_eq!(activity.flow_type(), Some("messaging"));
assert_eq!(
activity.payload().get("text").and_then(Value::as_str),
Some("hello there")
);
}
#[test]
fn build_activity_without_text_wraps_generic_payload() {
let payload = json!({ "kind": "ping", "n": 7 });
let activity = build_activity(&payload, "acme", None, None, None, None);
assert_eq!(activity.tenant(), Some("acme"));
assert_eq!(activity.user(), None);
assert_eq!(activity.session_id(), None);
assert_eq!(activity.payload(), &payload);
}
#[test]
fn build_activity_empty_body_is_a_null_custom_activity() {
let activity = build_activity(&Value::Null, "acme", None, None, None, None);
assert_eq!(activity.tenant(), Some("acme"));
assert_eq!(activity.payload(), &Value::Null);
}
#[test]
fn normalize_worker_payload_lifts_card_submit_into_metadata() {
let submit = json!({ "action": "about_card" });
let shaped = normalize_worker_payload(&submit);
assert_eq!(shaped, json!({ "metadata": { "action": "about_card" } }));
let activity = build_activity(&shaped, "acme", None, Some("s1"), None, None);
assert_eq!(
activity
.payload()
.pointer("/metadata/action")
.and_then(Value::as_str),
Some("about_card")
);
}
#[test]
fn normalize_worker_payload_passes_text_through() {
let typed = json!({ "text": "capabilities" });
assert_eq!(normalize_worker_payload(&typed), typed);
}
#[test]
fn normalize_worker_payload_passes_empty_and_non_object_through() {
assert_eq!(normalize_worker_payload(&json!({})), json!({}));
assert_eq!(normalize_worker_payload(&Value::Null), Value::Null);
assert_eq!(normalize_worker_payload(&json!("hi")), json!("hi"));
}
#[test]
fn activity_to_worker_message_maps_card_text_and_generic() {
let card = json!({ "type": "AdaptiveCard", "version": "1.6" });
let card_activity = Activity::custom("response", json!({ "renderedCard": card.clone() }));
let msg = activity_to_worker_message(&card_activity);
assert_eq!(msg.kind, "adaptive-card");
assert_eq!(msg.payload, card);
let text_activity = Activity::text("hello there");
let msg = activity_to_worker_message(&text_activity);
assert_eq!(msg.kind, "text");
assert_eq!(
msg.payload.get("text").and_then(Value::as_str),
Some("hello there")
);
let other = Activity::custom("response", json!({ "n": 7 }));
let msg = activity_to_worker_message(&other);
assert_eq!(msg.kind, "activity");
assert_eq!(msg.payload, json!({ "n": 7 }));
}
#[test]
fn activity_to_worker_message_unwraps_pending_card() {
let card = json!({ "type": "AdaptiveCard", "version": "1.6" });
let pending = Activity::custom(
"response",
json!({
"status": "pending",
"reason": "awaiting user submit",
"response": { "renderedCard": card.clone() }
}),
);
let msg = activity_to_worker_message(&pending);
assert_eq!(msg.kind, "adaptive-card");
assert_eq!(msg.payload, card);
}
#[test]
fn build_activity_plumbs_messaging_endpoint_id() {
let payload = json!({ "text": "hello" });
let activity = build_activity(
&payload,
"acme",
Some("u1"),
Some("s1"),
Some("teams-legal"),
None,
);
let wire = serde_json::to_value(&activity).expect("serialize");
assert_eq!(
wire.get("messaging_endpoint_id").and_then(Value::as_str),
Some("teams-legal")
);
}
#[test]
fn build_activity_plumbs_welcome_flow_hint() {
let payload = json!({ "text": "hello" });
let activity = build_activity(
&payload,
"acme",
Some("u1"),
Some("s1"),
Some("teams-legal"),
Some(WelcomeFlowHint {
pack_id: "legal-pack".to_string(),
flow_id: "welcome".to_string(),
}),
);
assert_eq!(
activity.welcome_flow_hint(),
Some(&WelcomeFlowHint {
pack_id: "legal-pack".to_string(),
flow_id: "welcome".to_string(),
})
);
}
fn admit_with_endpoint(
linked_bundles: Vec<BundleId>,
welcome_flow: Option<WelcomeFlowRef>,
) -> (String, crate::endpoint_admit::EndpointAdmit) {
use greentic_deploy_spec::{
Environment, EnvironmentHostConfig, MessagingEndpoint, MessagingEndpointId,
SchemaVersion,
};
use greentic_types::EnvId;
let env_id = EnvId::try_from("local").unwrap();
let endpoint_id = MessagingEndpointId::new();
let wire_id = endpoint_id.to_string();
let now = chrono::Utc::now();
let endpoint = MessagingEndpoint {
schema: SchemaVersion::new(SchemaVersion::MESSAGING_ENDPOINT_V1),
env_id: env_id.clone(),
endpoint_id,
provider_id: "teams-legal".to_string(),
provider_type: "teams".to_string(),
display_name: "Legal".to_string(),
secret_refs: Vec::new(),
webhook_secret_ref: None,
linked_bundles,
welcome_flow,
generation: 1,
created_at: now,
updated_at: now,
updated_by: "test".to_string(),
};
let env = Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: env_id.clone(),
name: "local".to_string(),
host_config: EnvironmentHostConfig {
env_id,
region: None,
tenant_org_id: None,
listen_addr: None,
public_base_url: None,
},
packs: Vec::new(),
messaging_endpoints: vec![endpoint],
extensions: Vec::new(),
credentials_ref: None,
bundles: Vec::new(),
revisions: Vec::new(),
traffic_splits: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
};
(
wire_id,
crate::endpoint_admit::EndpointAdmit::from_environment(&env),
)
}
#[test]
fn resolve_welcome_flow_hint_returns_hint_when_endpoint_declares_it() {
use greentic_deploy_spec::{PackId, WelcomeFlowRef};
let (wire_id, admit) = admit_with_endpoint(
vec![BundleId::new("legal-bundle")],
Some(WelcomeFlowRef {
bundle_id: BundleId::new("legal-bundle"),
pack_id: PackId::new("legal-pack"),
flow_id: "welcome".to_string(),
}),
);
assert_eq!(
resolve_welcome_flow_hint(Some(&wire_id), &BundleId::new("legal-bundle"), &admit),
Some(WelcomeFlowHint {
pack_id: "legal-pack".to_string(),
flow_id: "welcome".to_string(),
})
);
}
#[test]
fn resolve_welcome_flow_hint_returns_none_without_endpoint() {
let admit = crate::endpoint_admit::EndpointAdmit::default();
assert_eq!(
resolve_welcome_flow_hint(None, &BundleId::new("any-bundle"), &admit),
None
);
}
#[test]
fn resolve_welcome_flow_hint_returns_none_when_endpoint_has_no_welcome() {
let admit = crate::endpoint_admit::EndpointAdmit::default();
assert_eq!(
resolve_welcome_flow_hint(Some("teams-legal"), &BundleId::new("any-bundle"), &admit),
None
);
}
#[test]
fn resolve_welcome_flow_hint_returns_none_when_dispatched_bundle_differs() {
use greentic_deploy_spec::{PackId, WelcomeFlowRef};
let (wire_id, admit) = admit_with_endpoint(
vec![BundleId::new("bundle-a"), BundleId::new("bundle-b")],
Some(WelcomeFlowRef {
bundle_id: BundleId::new("bundle-b"),
pack_id: PackId::new("legal-pack"),
flow_id: "welcome".to_string(),
}),
);
assert_eq!(
resolve_welcome_flow_hint(Some(&wire_id), &BundleId::new("bundle-a"), &admit),
None
);
assert_eq!(
resolve_welcome_flow_hint(Some(&wire_id), &BundleId::new("bundle-b"), &admit),
Some(WelcomeFlowHint {
pack_id: "legal-pack".to_string(),
flow_id: "welcome".to_string(),
})
);
}
#[test]
fn read_cookie_picks_the_named_pair() {
let jar = "foo=1; _gt_rev_abc=xyz ; bar=2";
assert_eq!(read_cookie(jar, "_gt_rev_abc"), Some("xyz".to_string()));
assert_eq!(read_cookie(jar, "missing"), None);
}
#[test]
fn caller_identity_is_honoured_only_from_loopback() {
let payload = json!({ "user": "body-user", "session": "body-session" });
let (user, session, endpoint) =
caller_identity(true, Some("hdr-user".into()), None, None, &payload);
assert_eq!(user.as_deref(), Some("hdr-user"));
assert_eq!(session.as_deref(), Some("body-session"));
assert!(endpoint.is_none());
let (user, session, endpoint) = caller_identity(
false,
Some("hdr-user".into()),
Some("hdr-session".into()),
Some("teams-legal".into()),
&payload,
);
assert_eq!(user, None);
assert_eq!(session, None);
assert!(endpoint.is_none());
}
#[test]
fn caller_identity_returns_messaging_endpoint_from_loopback_header() {
let payload = json!({});
let (_, _, endpoint) =
caller_identity(true, None, None, Some("teams-legal".into()), &payload);
assert_eq!(endpoint.as_deref(), Some("teams-legal"));
}
#[test]
fn caller_identity_never_reads_messaging_endpoint_from_body() {
let payload = json!({ "messaging_endpoint_id": "teams-attacker" });
let (_, _, endpoint) = caller_identity(true, None, None, None, &payload);
assert!(endpoint.is_none());
}
#[test]
fn caller_identity_drops_messaging_endpoint_on_non_loopback() {
let payload = json!({});
let (_, _, endpoint) =
caller_identity(false, None, None, Some("teams-legal".into()), &payload);
assert!(endpoint.is_none());
}
#[test]
fn caller_identity_silently_drops_malformed_endpoint_header() {
let payload = json!({});
for raw in ["", " ", "legal::accounting", "teams legal", "teams\n"] {
let (_, _, endpoint) = caller_identity(true, None, None, Some(raw.into()), &payload);
assert!(
endpoint.is_none(),
"header value {raw:?} should be rejected"
);
}
}
#[test]
fn validate_endpoint_id_accepts_slug_and_ulid_forms() {
assert_eq!(
validate_endpoint_id("teams-legal".into()),
Some("teams-legal".into())
);
assert_eq!(
validate_endpoint_id("teams_legal.v2".into()),
Some("teams_legal.v2".into())
);
let ulid = "01HV3ZQXW8K0YBN8FXZ7P4M2R5";
assert_eq!(validate_endpoint_id(ulid.into()), Some(ulid.into()));
}
#[test]
fn validate_endpoint_id_rejects_empty_and_surrounding_whitespace() {
for raw in ["", " ", "\t\n", " teams-legal "] {
assert!(
validate_endpoint_id(raw.into()).is_none(),
"{raw:?} should reject"
);
}
}
#[test]
fn validate_endpoint_id_rejects_prefix_delimiter() {
for raw in ["legal::accounting", "foo:bar"] {
assert!(
validate_endpoint_id(raw.into()).is_none(),
"{raw:?} should reject"
);
}
}
#[test]
fn validate_endpoint_id_rejects_control_chars_and_non_ascii() {
for raw in [
"teams\nlegal",
"teams\0legal",
"teams legal", "teams/legal",
"команда", ] {
assert!(
validate_endpoint_id(raw.into()).is_none(),
"{raw:?} should reject"
);
}
}
#[test]
fn validate_endpoint_id_rejects_over_length() {
let too_long = "a".repeat(129);
assert!(validate_endpoint_id(too_long).is_none());
let max_ok = "a".repeat(128);
assert_eq!(validate_endpoint_id(max_ok.clone()), Some(max_ok));
}
#[test]
fn resolve_admission_returns_not_asserted_when_no_endpoint_header() {
let admit = crate::endpoint_admit::EndpointAdmit::default();
let outcome =
resolve_endpoint_admission(None, &admit).expect("no header is a clean pass-through");
assert!(matches!(outcome, EndpointAdmission::NotAsserted));
}
#[test]
fn resolve_admission_resolves_known_endpoint_to_its_acl() {
use greentic_deploy_spec::{
BundleId, Environment, EnvironmentHostConfig, MessagingEndpoint, MessagingEndpointId,
SchemaVersion,
};
use greentic_types::EnvId;
let env_id = EnvId::try_from("local").unwrap();
let endpoint_id = MessagingEndpointId::new();
let wire_id = endpoint_id.to_string();
let now = chrono::Utc::now();
let endpoint = MessagingEndpoint {
schema: SchemaVersion::new(SchemaVersion::MESSAGING_ENDPOINT_V1),
env_id: env_id.clone(),
endpoint_id,
provider_id: "teams-legal".to_string(),
provider_type: "teams".to_string(),
display_name: "Legal".to_string(),
secret_refs: Vec::new(),
webhook_secret_ref: None,
linked_bundles: vec![BundleId::new("legal-bundle")],
welcome_flow: None,
generation: 1,
created_at: now,
updated_at: now,
updated_by: "test".to_string(),
};
let env = Environment {
schema: SchemaVersion::new(SchemaVersion::ENVIRONMENT_V1),
environment_id: env_id.clone(),
name: "local".to_string(),
host_config: EnvironmentHostConfig {
env_id,
region: None,
tenant_org_id: None,
listen_addr: None,
public_base_url: None,
},
packs: Vec::new(),
messaging_endpoints: vec![endpoint],
extensions: Vec::new(),
credentials_ref: None,
bundles: Vec::new(),
revisions: Vec::new(),
traffic_splits: Vec::new(),
revocation: Default::default(),
retention: Default::default(),
health: Default::default(),
};
let admit = crate::endpoint_admit::EndpointAdmit::from_environment(&env);
let outcome = resolve_endpoint_admission(Some(&wire_id), &admit).expect("known endpoint");
match outcome {
EndpointAdmission::Resolved(acl) => assert!(acl.contains("legal-bundle")),
EndpointAdmission::NotAsserted => panic!("known endpoint must resolve to ACL"),
}
}
#[test]
fn resolve_admission_refuses_unknown_endpoint_with_401() {
let admit = crate::endpoint_admit::EndpointAdmit::default();
let err = resolve_endpoint_admission(Some("bogus"), &admit)
.expect_err("unknown endpoint must refuse");
assert_eq!(err.status(), StatusCode::UNAUTHORIZED);
}
#[test]
fn check_bundle_admission_skips_when_endpoint_not_asserted() {
check_bundle_admission(&EndpointAdmission::NotAsserted, "any-bundle")
.expect("no-header path must pass");
}
#[test]
fn check_bundle_admission_allows_bundle_in_acl() {
let mut acl = std::collections::HashSet::new();
acl.insert("legal-bundle".to_string());
acl.insert("shared-utils".to_string());
check_bundle_admission(&EndpointAdmission::Resolved(&acl), "legal-bundle")
.expect("bundle in ACL is admitted");
}
#[test]
fn check_bundle_admission_rejects_bundle_outside_acl_with_403() {
let mut acl = std::collections::HashSet::new();
acl.insert("finance-bundle".to_string());
let err = check_bundle_admission(&EndpointAdmission::Resolved(&acl), "legal-bundle")
.expect_err("out-of-ACL bundle must refuse");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
#[test]
fn check_bundle_admission_empty_acl_rejects_every_bundle() {
let acl = std::collections::HashSet::new();
let err = check_bundle_admission(&EndpointAdmission::Resolved(&acl), "any-bundle")
.expect_err("empty ACL must refuse every bundle");
assert_eq!(err.status(), StatusCode::FORBIDDEN);
}
fn provider_route_table(scope: &RevisionScope) -> HttpRouteTable {
use crate::domains::Domain;
HttpRouteTable::from_descriptors(vec![crate::http_routes::descriptor_for_test(
"/slack/events",
&["POST"],
Domain::Messaging,
Some(scope.clone()),
)])
}
fn test_scope() -> RevisionScope {
RevisionScope {
deployment_id: greentic_deploy_spec::DeploymentId::new(),
bundle_id: greentic_deploy_spec::BundleId::new("fast2flow"),
revision_id: greentic_deploy_spec::RevisionId::new(),
}
}
#[test]
fn admit_refuses_declared_provider_route() {
let scope = test_scope();
let routes = provider_route_table(&scope);
assert_eq!(
admit_request(&routes, &scope, "/slack/events", &hyper::Method::POST),
Admission::ProviderRoute
);
}
#[test]
fn admit_rejects_non_post_on_generic_path() {
let scope = test_scope();
let routes = provider_route_table(&scope);
assert_eq!(
admit_request(&routes, &scope, "/favicon.ico", &hyper::Method::GET),
Admission::MethodNotAllowed
);
}
#[test]
fn admit_serves_generic_post() {
let scope = test_scope();
let routes = provider_route_table(&scope);
assert_eq!(
admit_request(&routes, &scope, "/api/chat", &hyper::Method::POST),
Admission::Serve
);
}
#[test]
fn admit_does_not_match_provider_route_of_a_different_revision() {
let scope = test_scope();
let routes = provider_route_table(&scope);
let other = test_scope();
assert_eq!(
admit_request(&routes, &other, "/slack/events", &hyper::Method::POST),
Admission::Serve
);
}
#[test]
fn parse_query_pairs_splits_amp_separated_pairs() {
let pairs = parse_query_pairs(Some("a=1&b=&c"));
assert_eq!(
pairs,
vec![
("a".to_string(), "1".to_string()),
("b".to_string(), String::new()),
("c".to_string(), String::new()),
]
);
}
#[test]
fn parse_query_pairs_handles_none_and_empty() {
assert!(parse_query_pairs(None).is_empty());
assert!(parse_query_pairs(Some("")).is_empty());
assert_eq!(
parse_query_pairs(Some("&a=1")),
vec![("a".to_string(), "1".to_string())]
);
}
#[test]
fn collect_forwarded_request_headers_lowercases_keys_and_keeps_multi_value() {
use hyper::http::header::{HeaderName, HeaderValue};
let mut map = HeaderMap::new();
map.insert(
HeaderName::from_static("content-type"),
HeaderValue::from_static("application/json"),
);
map.append(
HeaderName::from_static("x-trace"),
HeaderValue::from_static("a"),
);
map.append(
HeaderName::from_static("x-trace"),
HeaderValue::from_static("b"),
);
map.insert(
HeaderName::from_static("x-bad"),
HeaderValue::from_bytes(&[0xff, 0xfe]).unwrap(),
);
let mut headers = collect_forwarded_request_headers(&map);
headers.sort();
assert_eq!(
headers,
vec![
("content-type".to_string(), "application/json".to_string()),
("x-trace".to_string(), "a".to_string()),
("x-trace".to_string(), "b".to_string()),
]
);
}
#[test]
fn build_provider_http_in_wires_fields_and_b64_body() {
let headers = vec![("content-type".to_string(), "application/json".to_string())];
let http_in = build_provider_http_in(
"messaging.telegram.bot",
"acme",
"POST",
"/webhook/telegram",
Some("token=abc&n=1"),
&headers,
br#"{"update_id":42}"#,
);
assert_eq!(http_in.provider, "messaging.telegram.bot");
assert_eq!(http_in.tenant_hint.as_deref(), Some("acme"));
assert_eq!(http_in.method, "POST");
assert_eq!(http_in.path, "/webhook/telegram");
assert_eq!(http_in.headers, headers);
assert_eq!(http_in.query.len(), 2);
assert_eq!(http_in.body_b64, BASE64.encode(br#"{"update_id":42}"#));
}
#[test]
fn envelope_to_activity_maps_text_and_identity() {
let envelope: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "msg-1",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "telegram",
"session_id": "sess-1",
"from": { "id": "u1", "kind": "user" },
"text": "hello",
"metadata": {},
}))
.expect("envelope");
let activity = envelope_to_activity(&envelope, "fallback", None, None);
assert_eq!(activity.tenant(), Some("acme"));
assert_eq!(activity.session_id(), Some("sess-1"));
assert_eq!(activity.user(), Some("u1"));
assert_eq!(activity.flow_type(), Some("messaging"));
assert_eq!(
activity.payload().get("text").and_then(Value::as_str),
Some("hello"),
);
}
#[test]
fn envelope_to_activity_skips_empty_session_pin() {
let envelope: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "msg-1",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "telegram",
"session_id": "",
"metadata": {},
}))
.expect("envelope");
let activity = envelope_to_activity(&envelope, "fallback", None, None);
assert_eq!(activity.tenant(), Some("acme"));
assert_eq!(activity.session_id(), None);
}
#[test]
fn envelope_to_activity_carries_metadata_for_button_routing() {
let envelope: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "cb-1",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "telegram",
"session_id": "sess-1",
"from": { "id": "u1", "kind": "user" },
"text": "[callback:{\"action\":\"about_card\"}]",
"metadata": {
"action": "about_card",
"callback_data": "{\"action\":\"about_card\"}",
},
}))
.expect("envelope");
let activity = envelope_to_activity(&envelope, "fallback", None, None);
let entry = activity.payload();
assert_eq!(
entry.pointer("/metadata/action").and_then(Value::as_str),
Some("about_card"),
);
assert_eq!(
entry.pointer("/text").and_then(Value::as_str),
Some("[callback:{\"action\":\"about_card\"}]"),
);
assert_eq!(activity.flow_type(), Some("messaging"));
}
#[test]
fn envelope_to_activity_attaches_endpoint_and_welcome_hint() {
let envelope: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "msg-1",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "telegram",
"session_id": "sess-1",
"from": { "id": "u1", "kind": "user" },
"text": "hello",
"metadata": {},
}))
.expect("envelope");
let hint = WelcomeFlowHint {
pack_id: "welcome-pack".to_string(),
flow_id: "welcome-flow".to_string(),
};
let activity =
envelope_to_activity(&envelope, "fallback", Some("ep-legal"), Some(hint.clone()));
assert_eq!(activity.messaging_endpoint_id(), Some("ep-legal"));
assert_eq!(activity.welcome_flow_hint(), Some(&hint));
}
#[test]
fn build_reply_envelope_text_activity_clones_ingress_route() {
let ingress: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "msg-in-1",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "telegram",
"session_id": "chat-42",
"to": [{ "id": "room-1", "kind": "room" }],
"from": { "id": "user-1", "kind": "user" },
"text": "hi",
"metadata": { "route": "/webhook/telegram", "leftover": "stripped" },
}))
.expect("ingress envelope");
let reply = Activity::text("hello back");
let envelope = build_reply_envelope(&ingress, &reply);
assert_eq!(envelope.session_id, "chat-42");
assert_eq!(envelope.channel, "telegram");
assert_eq!(envelope.to.len(), 1);
assert_eq!(envelope.text.as_deref(), Some("hello back"));
assert!(envelope.from.is_none());
assert!(envelope.correlation_id.is_none());
assert_ne!(envelope.id, "msg-in-1");
assert!(envelope.metadata.contains_key("route"));
assert!(!envelope.metadata.contains_key("leftover"));
}
#[test]
fn build_reply_envelope_lifts_rendered_card_into_metadata() {
let ingress: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "msg-in-card",
"tenant": { "env": "dev", "tenant": "acme", "tenant_id": "acme", "attempt": 0 },
"channel": "telegram",
"session_id": "chat-7",
"to": [{ "id": "room-1", "kind": "room" }],
"text": "hi",
}))
.expect("ingress envelope");
let card = json!({ "type": "AdaptiveCard", "version": "1.6" });
let reply = Activity::custom("response", json!({ "renderedCard": card.clone() }));
let envelope = build_reply_envelope(&ingress, &reply);
let stored: Value = serde_json::from_str(
envelope
.metadata
.get("adaptive_card")
.expect("adaptive_card in metadata"),
)
.expect("card json");
assert_eq!(stored, card);
assert_eq!(
envelope.extensions.get(ext_keys::ADAPTIVE_CARD),
Some(&card)
);
assert!(envelope.text.is_none());
assert_eq!(envelope.channel, "telegram");
assert_eq!(envelope.session_id, "chat-7");
let pending = Activity::custom(
"response",
json!({ "status": "pending", "response": { "renderedCard": card.clone() } }),
);
let envelope = build_reply_envelope(&ingress, &pending);
let stored: Value = serde_json::from_str(
envelope
.metadata
.get("adaptive_card")
.expect("adaptive_card from pending wrapper"),
)
.expect("card json");
assert_eq!(stored, card);
}
#[test]
fn build_reply_envelope_full_payload_envelope_uses_it_verbatim() {
let ingress: ChannelMessageEnvelope = serde_json::from_value(json!({
"id": "msg-in-2",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "telegram",
"session_id": "chat-7",
"to": [{ "id": "room-7", "kind": "room" }],
"metadata": {},
}))
.expect("ingress envelope");
let reply_payload = json!({
"id": "",
"tenant": {
"env": "dev",
"tenant": "acme",
"tenant_id": "acme",
"attempt": 0,
},
"channel": "",
"session_id": "",
"text": "scripted-reply",
"metadata": {},
});
let reply = Activity::custom("messaging", reply_payload);
let envelope = build_reply_envelope(&ingress, &reply);
assert_eq!(envelope.session_id, "chat-7");
assert_eq!(envelope.channel, "telegram");
assert_eq!(envelope.to.len(), 1);
assert_eq!(envelope.text.as_deref(), Some("scripted-reply"));
assert!(!envelope.id.is_empty(), "id backfilled from uuid");
}
#[test]
fn synthesize_provider_response_defaults_to_200_and_preserves_body() {
let response = IngressHttpResponse {
status: 0, headers: vec![("content-type".to_string(), "text/plain".to_string())],
body: Some(b"ok".to_vec()),
};
let out = synthesize_provider_response(&response);
assert_eq!(out.status(), StatusCode::OK);
assert_eq!(
out.headers()
.get("content-type")
.and_then(|v| v.to_str().ok()),
Some("text/plain"),
);
}
#[test]
fn synthesize_provider_response_drops_malformed_headers() {
let response = IngressHttpResponse {
status: 202,
headers: vec![
("\n bad".to_string(), "x".to_string()),
("x-good".to_string(), "ok".to_string()),
],
body: None,
};
let out = synthesize_provider_response(&response);
assert_eq!(out.status(), StatusCode::ACCEPTED);
assert!(out.headers().get("\n bad").is_none());
assert_eq!(
out.headers().get("x-good").and_then(|v| v.to_str().ok()),
Some("ok"),
);
}
#[test]
fn admit_classifies_synthesized_webhook_for_root_bound_deployment() {
let scope = test_scope();
let dir = tempfile::tempdir().expect("tempdir");
let pack_path = dir.path().join("telegram.gtpack");
crate::http_routes::tests::write_provider_pack(
&pack_path,
"telegram-pack",
"messaging.telegram.bot",
&["ingest_http"],
);
let descriptors = crate::http_routes::synthesize_provider_ingest_routes(
&[pack_path],
&scope,
&[], );
assert_eq!(descriptors.len(), 1, "root-bound synthesis emits one route");
let table = HttpRouteTable::from_descriptors(descriptors);
assert_eq!(
admit_request(&table, &scope, "/webhook/telegram", &hyper::Method::POST),
Admission::ProviderRoute,
);
}
fn envelope_for(user: &str, conversation: &str) -> IngressEnvelope {
IngressEnvelope {
tenant: "acme".into(),
env: Some("local".into()),
pack_id: Some("pack.demo".into()),
flow_id: "flow.main".into(),
flow_type: Some("messaging".into()),
action: Some("messaging".into()),
session_hint: Some(format!("acme:provider:{conversation}:{user}")),
provider: Some("provider".into()),
messaging_endpoint_id: None,
channel: Some(conversation.into()),
conversation: Some(conversation.into()),
user: Some(user.into()),
activity_id: Some(format!("activity-{conversation}")),
timestamp: None,
payload: json!({ "text": "hi" }),
metadata: None,
reply_scope: Some(ReplyScope {
conversation: conversation.into(),
thread: None,
reply_to: None,
correlation: None,
}),
}
.canonicalize()
}
fn wait_for(next_node: &str) -> FlowWait {
let state: ExecutionState = serde_json::from_value(json!({
"input": { "text": "hi" },
"nodes": {},
"egress": []
}))
.expect("state");
FlowWait {
reason: Some("await-user".into()),
snapshot: FlowSnapshot {
pack_id: "pack.demo".into(),
flow_id: "flow.main".into(),
next_flow: None,
next_node: next_node.into(),
state,
},
}
}
#[test]
fn isolated_revision_stores_do_not_cross_resume() {
let store_a = FlowResumeStore::new(new_session_store());
let store_b = FlowResumeStore::new(new_session_store());
let envelope = envelope_for("user-1", "conv-1");
store_a
.save(&envelope, &wait_for("node-a"))
.expect("save A");
assert!(
store_b.fetch(&envelope).expect("fetch B").is_none(),
"revision B must not observe revision A's suspended snapshot"
);
let resumed = store_a
.fetch(&envelope)
.expect("fetch A")
.expect("A snapshot present");
assert_eq!(resumed.next_node, "node-a");
store_a.clear(&envelope).expect("clear A");
}
#[test]
fn shared_revision_store_leaks_across_revisions() {
let shared = new_session_store();
let store_a = FlowResumeStore::new(Arc::clone(&shared));
let store_b = FlowResumeStore::new(shared);
let envelope = envelope_for("user-1", "conv-1");
store_a
.save(&envelope, &wait_for("node-a"))
.expect("save A");
let leaked = store_b
.fetch(&envelope)
.expect("fetch B")
.expect("shared store leaks the snapshot to revision B");
assert_eq!(
leaked.next_node, "node-a",
"shared store hands revision A's snapshot to revision B (the bug)"
);
store_a.clear(&envelope).expect("clear");
}
fn host_cfg_with(addr: Option<SocketAddr>) -> EnvironmentHostConfig {
EnvironmentHostConfig {
env_id: greentic_types::EnvId::new("local").unwrap(),
region: None,
tenant_org_id: None,
listen_addr: addr,
public_base_url: None,
}
}
struct EnvVarGuard {
gateway_prev: Option<std::ffi::OsString>,
port_prev: Option<std::ffi::OsString>,
}
impl EnvVarGuard {
fn clean() -> Self {
let gateway_prev = std::env::var_os("GREENTIC_GATEWAY_LISTEN_ADDR");
let port_prev = std::env::var_os("PORT");
unsafe {
std::env::remove_var("GREENTIC_GATEWAY_LISTEN_ADDR");
std::env::remove_var("PORT");
}
Self {
gateway_prev,
port_prev,
}
}
}
impl Drop for EnvVarGuard {
fn drop(&mut self) {
unsafe {
match &self.gateway_prev {
Some(v) => std::env::set_var("GREENTIC_GATEWAY_LISTEN_ADDR", v),
None => std::env::remove_var("GREENTIC_GATEWAY_LISTEN_ADDR"),
}
match &self.port_prev {
Some(v) => std::env::set_var("PORT", v),
None => std::env::remove_var("PORT"),
}
}
}
}
fn activation_for_test(
host: std::sync::Arc<greentic_runner_host::RunnerHost>,
dispatcher: crate::revision_dispatcher::RevisionDispatcher,
) -> Activation {
Activation {
host,
routing: std::sync::Arc::new(RevisionIngressRouting {
dispatcher: std::sync::Arc::new(dispatcher),
http_routes: HttpRouteTable::from_descriptors(Vec::new()),
deployment_routes: crate::deployment_routes::DeploymentRouteTable::default(),
endpoint_admit: std::sync::Arc::new(crate::endpoint_admit::EndpointAdmit::default()),
deployment_config_overrides: std::sync::Arc::default(),
}),
}
}
fn empty_activation(env_id: &str) -> Activation {
use crate::revision_dispatcher::{RevisionDispatcher, RevisionDispatcherConfig};
let host = std::sync::Arc::new(
greentic_runner_host::HostBuilder::new()
.with_config(greentic_runner_host::HostConfig::from_gtbind(
greentic_runner_host::TenantBindings {
tenant: env_id.to_string(),
packs: Vec::new(),
env_passthrough: Vec::new(),
},
))
.build()
.expect("build placeholder host"),
);
let dispatcher = RevisionDispatcher::new(RevisionDispatcherConfig::new(env_id, [0u8; 32]));
activation_for_test(host, dispatcher)
}
fn empty_state(env_id: &str, bound: SocketAddr) -> ServeState {
ServeState {
slot: ArcSwap::new(std::sync::Arc::new(empty_activation(env_id))),
bound_addr: bound,
}
}
fn body_string(resp: Response<Full<Bytes>>) -> String {
let body = resp.into_body();
let runtime = tokio::runtime::Builder::new_current_thread()
.build()
.expect("current-thread runtime for test body collection");
let collected = runtime.block_on(body.collect()).expect("collect Full body");
let bytes = collected.to_bytes();
String::from_utf8_lossy(&bytes).into_owned()
}
#[test]
fn try_probe_response_returns_ok_for_each_probe_alias() {
let bound: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let state = empty_state("local", bound);
for path in ["/livez", "/readyz", "/healthz", "/health"] {
let resp = try_probe_response(path, &state)
.unwrap_or_else(|| panic!("expected probe response for {path}"));
assert_eq!(resp.status(), StatusCode::OK, "{path} status");
assert_eq!(body_string(resp), "ok", "{path} body");
}
}
#[test]
fn try_probe_response_status_reports_empty_runtime_diagnostics() {
let bound: SocketAddr = "0.0.0.0:9090".parse().unwrap();
let state = empty_state("prod-eu", bound);
let resp = try_probe_response("/status", &state).expect("status response");
assert_eq!(resp.status(), StatusCode::OK);
let body: serde_json::Value = serde_json::from_str(&body_string(resp)).unwrap();
assert_eq!(body["schema"], "greentic.status.v1");
assert_eq!(body["env_id"], "prod-eu");
assert_eq!(body["listen_addr"], "0.0.0.0:9090");
assert_eq!(body["bundles_active"], 0);
assert_eq!(body["deployments_routed"], 0);
assert_eq!(body["revisions_active"], 0);
}
#[test]
fn try_probe_response_returns_none_for_non_probe_paths() {
let bound: SocketAddr = "127.0.0.1:8080".parse().unwrap();
let state = empty_state("local", bound);
assert!(try_probe_response("/api/chat", &state).is_none());
assert!(try_probe_response("/livez/sub", &state).is_none());
assert!(try_probe_response("/", &state).is_none());
}
#[test]
fn resolve_bind_addr_falls_back_to_spec_default_when_nothing_is_set() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
assert_eq!(resolve_bind_addr(None), DEFAULT_LISTEN_ADDR);
}
#[test]
fn resolve_bind_addr_uses_host_config_when_set() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
let configured: SocketAddr = "192.168.1.10:9000".parse().unwrap();
let host = host_cfg_with(Some(configured));
assert_eq!(resolve_bind_addr(Some(&host)), configured);
}
#[test]
fn resolve_bind_addr_gateway_env_full_socketaddr_overrides_host_config() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
let host = host_cfg_with(Some("192.168.1.10:9000".parse().unwrap()));
unsafe { std::env::set_var("GREENTIC_GATEWAY_LISTEN_ADDR", "0.0.0.0:7000") };
assert_eq!(
resolve_bind_addr(Some(&host)),
"0.0.0.0:7000".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn resolve_bind_addr_gateway_env_bare_ip_keeps_port_from_host_config() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
let host = host_cfg_with(Some("127.0.0.1:9090".parse().unwrap()));
unsafe { std::env::set_var("GREENTIC_GATEWAY_LISTEN_ADDR", "0.0.0.0") };
assert_eq!(
resolve_bind_addr(Some(&host)),
"0.0.0.0:9090".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn resolve_bind_addr_port_env_overrides_only_the_port() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
let host = host_cfg_with(Some("192.168.1.10:9000".parse().unwrap()));
unsafe { std::env::set_var("PORT", "5555") };
assert_eq!(
resolve_bind_addr(Some(&host)),
"192.168.1.10:5555".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn resolve_bind_addr_port_env_layers_on_top_of_gateway_env() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
unsafe {
std::env::set_var("GREENTIC_GATEWAY_LISTEN_ADDR", "10.0.0.5:8000");
std::env::set_var("PORT", "9999");
}
assert_eq!(
resolve_bind_addr(None),
"10.0.0.5:9999".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn resolve_bind_addr_invalid_gateway_env_falls_through() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
let host = host_cfg_with(Some("127.0.0.1:9090".parse().unwrap()));
unsafe { std::env::set_var("GREENTIC_GATEWAY_LISTEN_ADDR", "not-an-address") };
assert_eq!(
resolve_bind_addr(Some(&host)),
"127.0.0.1:9090".parse::<SocketAddr>().unwrap()
);
}
#[test]
fn resolve_bind_addr_invalid_port_env_falls_through() {
let _lock = crate::test_env_lock()
.lock()
.unwrap_or_else(|e| e.into_inner());
let _vars = EnvVarGuard::clean();
let host = host_cfg_with(Some("127.0.0.1:9090".parse().unwrap()));
unsafe { std::env::set_var("PORT", "not-a-number") };
assert_eq!(
resolve_bind_addr(Some(&host)),
"127.0.0.1:9090".parse::<SocketAddr>().unwrap()
);
}
fn populated_activation(env_id: &str, revision_count: u32) -> Activation {
use crate::revision_dispatcher::{
RevisionDispatcher, RevisionDispatcherConfig, RevisionEntry,
};
use greentic_deploy_spec::ids::{BundleId, DeploymentId, RevisionId};
let base = empty_activation(env_id);
let dispatcher = RevisionDispatcher::new(RevisionDispatcherConfig::new(env_id, [0u8; 32]));
let deployment_id = DeploymentId::new();
let bundle_id = BundleId::new("customer.support");
let total: u32 = 10_000;
let per_revision = total / revision_count;
let mut remainder = total - per_revision * revision_count;
let revisions: Vec<RevisionEntry> = (0..revision_count)
.map(|_| {
let weight_bps = per_revision + if remainder > 0 { 1 } else { 0 };
remainder = remainder.saturating_sub(1);
RevisionEntry {
revision_id: RevisionId::new(),
bundle_id: bundle_id.clone(),
weight_bps,
}
})
.collect();
dispatcher
.apply_traffic_split(deployment_id, revisions, bundle_id, 0)
.expect("apply_traffic_split for test activation");
activation_for_test(base.host, dispatcher)
}
fn server_for_test(state: std::sync::Arc<ServeState>) -> RevisionServer {
let mut watermark: HashMap<DeploymentId, u64> = HashMap::new();
state
.slot
.load()
.routing
.dispatcher
.absorb_into_watermark(&mut watermark);
RevisionServer {
shutdown: None,
handle: None,
actual_port: 0,
state,
runtime_handle: Handle::current(),
reload_lock: std::sync::Mutex::new(()),
generation_watermark: std::sync::Mutex::new(watermark),
}
}
#[tokio::test]
async fn reload_swaps_activation_visible_to_next_counts() {
let bound: SocketAddr = "127.0.0.1:0".parse().unwrap();
let state = std::sync::Arc::new(empty_state("env-1", bound));
let server = server_for_test(state);
assert_eq!(server.counts(), (0, 0));
let report = server.reload(populated_activation("env-1", 2), Duration::ZERO);
assert_eq!(report.prev_deployments, 0);
assert_eq!(report.prev_revisions, 0);
assert_eq!(report.new_deployments, 1);
assert_eq!(report.new_revisions, 2);
assert_eq!(server.counts(), (1, 2));
}
#[tokio::test]
async fn reload_inflight_arc_outlives_swap() {
let bound: SocketAddr = "127.0.0.1:0".parse().unwrap();
let state = std::sync::Arc::new(empty_state("env-1", bound));
let server = server_for_test(std::sync::Arc::clone(&state));
let inflight = state.current();
let inflight_ptr = std::sync::Arc::as_ptr(&inflight) as usize;
server.reload(populated_activation("env-1", 1), Duration::from_secs(60));
let post_swap = state.current();
assert_ne!(
std::sync::Arc::as_ptr(&post_swap) as usize,
inflight_ptr,
"post-swap snapshot must point at the new activation"
);
let (old_deps, old_revs) = inflight.routing.dispatcher.counts();
assert_eq!((old_deps, old_revs), (0, 0));
let (new_deps, new_revs) = post_swap.routing.dispatcher.counts();
assert_eq!((new_deps, new_revs), (1, 1));
}
#[tokio::test]
async fn reload_drops_old_activation_after_drain_window() {
let bound: SocketAddr = "127.0.0.1:0".parse().unwrap();
let state = std::sync::Arc::new(empty_state("env-1", bound));
let server = server_for_test(std::sync::Arc::clone(&state));
let weak_old = std::sync::Arc::downgrade(&state.current());
let drain_window = Duration::from_millis(50);
server.reload(populated_activation("env-1", 1), drain_window);
assert!(
weak_old.upgrade().is_some(),
"old activation must outlive the drain window"
);
tokio::time::sleep(drain_window + Duration::from_millis(200)).await;
assert!(
weak_old.upgrade().is_none(),
"old activation must be freed once the drain window elapses"
);
}
fn activation_with_ids(
env_id: &str,
deployment_id: greentic_deploy_spec::ids::DeploymentId,
revision_id: greentic_deploy_spec::ids::RevisionId,
bundle_id: greentic_deploy_spec::ids::BundleId,
) -> Activation {
use crate::revision_dispatcher::{
RevisionDispatcher, RevisionDispatcherConfig, RevisionEntry,
};
let base = empty_activation(env_id);
let dispatcher = RevisionDispatcher::new(RevisionDispatcherConfig::new(env_id, [0u8; 32]));
let revisions = vec![RevisionEntry {
revision_id,
bundle_id: bundle_id.clone(),
weight_bps: 10_000,
}];
dispatcher
.apply_traffic_split(deployment_id, revisions, bundle_id, 0)
.expect("apply_traffic_split for shared-deployment activation");
activation_for_test(base.host, dispatcher)
}
#[tokio::test]
async fn reload_invalidates_pre_reload_cookie_for_persisted_deployment() {
let env_id = "env-1";
let tenant = "tenant-a";
let dep_id = greentic_deploy_spec::ids::DeploymentId::new();
let rev_id = greentic_deploy_spec::ids::RevisionId::new();
let bundle_id = greentic_deploy_spec::ids::BundleId::new("customer.support");
let act1 = activation_with_ids(env_id, dep_id, rev_id, bundle_id.clone());
let bound: SocketAddr = "127.0.0.1:0".parse().unwrap();
let state = std::sync::Arc::new(ServeState {
slot: ArcSwap::new(std::sync::Arc::new(act1)),
bound_addr: bound,
});
let server = server_for_test(std::sync::Arc::clone(&state));
let act1_snap = state.current();
assert_eq!(
act1_snap.routing.dispatcher.counts(),
(1, 1),
"pre-reload activation must hold the test deployment + revision"
);
let cookie = act1_snap.routing.dispatcher.seal_cookie(
env_id,
tenant,
dep_id,
rev_id,
1,
9_999_999_999,
);
assert_eq!(
act1_snap
.routing
.dispatcher
.verify_cookie(&cookie, env_id, tenant, dep_id, 1, 0),
Some(rev_id),
"pre-reload dispatcher must verify its own cookie"
);
let act2 = activation_with_ids(env_id, dep_id, rev_id, bundle_id);
server.reload(act2, Duration::ZERO);
let act2_snap = state.current();
assert_eq!(
act2_snap
.routing
.dispatcher
.verify_cookie(&cookie, env_id, tenant, dep_id, 2, 0),
None,
"post-reload dispatcher must reject the pre-reload cookie"
);
let post_cookie = act2_snap.routing.dispatcher.seal_cookie(
env_id,
tenant,
dep_id,
rev_id,
2,
9_999_999_999,
);
assert_eq!(
act2_snap
.routing
.dispatcher
.verify_cookie(&post_cookie, env_id, tenant, dep_id, 2, 0),
Some(rev_id),
"post-reload dispatcher must verify a cookie minted at the new generation"
);
}
#[tokio::test]
async fn reload_invalidates_cookie_after_remove_and_readd_within_ttl() {
let env_id = "env-1";
let tenant = "tenant-a";
let dep_id = greentic_deploy_spec::ids::DeploymentId::new();
let rev_id = greentic_deploy_spec::ids::RevisionId::new();
let bundle_id = greentic_deploy_spec::ids::BundleId::new("customer.support");
let act1 = activation_with_ids(env_id, dep_id, rev_id, bundle_id.clone());
let bound: SocketAddr = "127.0.0.1:0".parse().unwrap();
let state = std::sync::Arc::new(ServeState {
slot: ArcSwap::new(std::sync::Arc::new(act1)),
bound_addr: bound,
});
let server = server_for_test(std::sync::Arc::clone(&state));
let act1_snap = state.current();
let cookie = act1_snap.routing.dispatcher.seal_cookie(
env_id,
tenant,
dep_id,
rev_id,
1,
9_999_999_999,
);
let empty = empty_activation(env_id);
server.reload(empty, Duration::ZERO);
let act3 = activation_with_ids(env_id, dep_id, rev_id, bundle_id);
server.reload(act3, Duration::ZERO);
let act3_snap = state.current();
assert_eq!(
act3_snap.routing.dispatcher.counts(),
(1, 1),
"re-added deployment must be present in the post-reload dispatcher"
);
assert_eq!(
act3_snap
.routing
.dispatcher
.verify_cookie(&cookie, env_id, tenant, dep_id, 2, 0),
None,
"cookie sealed before remove must NOT verify under the bumped generation"
);
let post_cookie = act3_snap.routing.dispatcher.seal_cookie(
env_id,
tenant,
dep_id,
rev_id,
2,
9_999_999_999,
);
assert_eq!(
act3_snap
.routing
.dispatcher
.verify_cookie(&post_cookie, env_id, tenant, dep_id, 2, 0),
Some(rev_id),
"cookie minted at the bumped generation (2) must verify"
);
}
fn activation_with_two_revisions(
env_id: &str,
tenant: &str,
deployment_id: DeploymentId,
rev_a: RevisionId,
rev_b: RevisionId,
bundle_id: BundleId,
) -> (Activation, std::sync::Arc<RevisionDispatcher>) {
use crate::revision_dispatcher::{RevisionDispatcherConfig, RevisionEntry};
let base = empty_activation(env_id);
let dispatcher = RevisionDispatcher::new(RevisionDispatcherConfig::new(env_id, [0u8; 32]));
let revisions = vec![
RevisionEntry {
revision_id: rev_a,
bundle_id: bundle_id.clone(),
weight_bps: 5_000,
},
RevisionEntry {
revision_id: rev_b,
bundle_id: bundle_id.clone(),
weight_bps: 5_000,
},
];
dispatcher
.apply_traffic_split(deployment_id, revisions, bundle_id, 0)
.expect("apply_traffic_split");
let dispatcher = std::sync::Arc::new(dispatcher);
let routing = std::sync::Arc::new(RevisionIngressRouting {
dispatcher: std::sync::Arc::clone(&dispatcher),
http_routes: HttpRouteTable::from_descriptors(Vec::new()),
deployment_routes: crate::deployment_routes::DeploymentRouteTable::from_parts(vec![(
deployment_id,
tenant.to_string(),
Vec::new(),
Vec::new(),
)]),
endpoint_admit: std::sync::Arc::new(crate::endpoint_admit::EndpointAdmit::default()),
deployment_config_overrides: std::sync::Arc::default(),
});
let activation = Activation {
host: base.host,
routing,
};
(activation, dispatcher)
}
#[tokio::test]
async fn reload_drain_marks_then_evicts_removed_revision() {
let env_id = "env-1";
let tenant = "tenant-a";
let dep_id = DeploymentId::new();
let rev_kept = RevisionId::new();
let rev_removed = RevisionId::new();
let bundle_id = BundleId::new("customer.support");
let (act_old, old_dispatcher) = activation_with_two_revisions(
env_id,
tenant,
dep_id,
rev_kept,
rev_removed,
bundle_id.clone(),
);
let state = serve_state_with(act_old);
let server = server_for_test(std::sync::Arc::clone(&state));
let act_new = activation_with_ids(env_id, dep_id, rev_kept, bundle_id);
server.reload(act_new, Duration::from_secs(1));
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
old_dispatcher.is_draining(dep_id, rev_removed),
"removed revision must be marked draining on OLD dispatcher"
);
assert!(
!old_dispatcher.is_draining(dep_id, rev_kept),
"kept revision must NOT be marked draining"
);
tokio::time::sleep(Duration::from_millis(1_200)).await;
let revision_ids: std::collections::HashSet<_> = old_dispatcher
.revision_keys()
.into_iter()
.filter(|(d, _, _)| *d == dep_id)
.map(|(_, _, r)| r)
.collect();
assert!(
!revision_ids.contains(&rev_removed),
"removed revision must be evicted from OLD dispatcher after drain"
);
assert!(
revision_ids.contains(&rev_kept),
"kept revision must remain on OLD dispatcher"
);
}
#[tokio::test]
async fn reload_does_not_drain_when_no_revisions_removed() {
let env_id = "env-1";
let tenant = "tenant-a";
let dep_id = DeploymentId::new();
let rev_a = RevisionId::new();
let rev_b = RevisionId::new();
let bundle_id = BundleId::new("customer.support");
let (act_old, old_dispatcher) =
activation_with_two_revisions(env_id, tenant, dep_id, rev_a, rev_b, bundle_id.clone());
let state = serve_state_with(act_old);
let server = server_for_test(std::sync::Arc::clone(&state));
let (act_new, _) =
activation_with_two_revisions(env_id, tenant, dep_id, rev_a, rev_b, bundle_id);
server.reload(act_new, Duration::from_millis(100));
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
old_dispatcher.draining_revisions(dep_id).is_empty(),
"no revisions removed → nothing marked draining (got {:?})",
old_dispatcher.draining_revisions(dep_id)
);
}
#[tokio::test]
async fn reload_zero_drain_window_skips_drain_spawn() {
let env_id = "env-1";
let tenant = "tenant-a";
let dep_id = DeploymentId::new();
let rev_a = RevisionId::new();
let rev_b = RevisionId::new();
let bundle_id = BundleId::new("customer.support");
let (act_old, old_dispatcher) =
activation_with_two_revisions(env_id, tenant, dep_id, rev_a, rev_b, bundle_id.clone());
let state = serve_state_with(act_old);
let server = server_for_test(std::sync::Arc::clone(&state));
let act_new = activation_with_ids(env_id, dep_id, rev_a, bundle_id);
server.reload(act_new, Duration::ZERO);
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(
old_dispatcher.draining_revisions(dep_id).is_empty(),
"drain_window == 0 must bypass drain spawn (got {:?})",
old_dispatcher.draining_revisions(dep_id)
);
}
fn serve_state_with(activation: Activation) -> std::sync::Arc<ServeState> {
let bound: SocketAddr = "127.0.0.1:0".parse().unwrap();
std::sync::Arc::new(ServeState {
slot: ArcSwap::new(std::sync::Arc::new(activation)),
bound_addr: bound,
})
}
#[test]
fn liveness_probe_reports_live_when_revision_present_in_newer_activation() {
let env_id = "env-1";
let dep_id = DeploymentId::new();
let rev_id = RevisionId::new();
let bundle_id = BundleId::new("customer.support");
let draining = activation_with_ids(env_id, dep_id, rev_id, bundle_id.clone());
let draining_dispatcher = std::sync::Arc::clone(&draining.routing.dispatcher);
let live = activation_with_ids(env_id, dep_id, rev_id, bundle_id);
let state = serve_state_with(live);
let probe = SlotLivenessProbe {
state,
draining_dispatcher,
};
assert!(
probe.is_live_elsewhere(dep_id, rev_id),
"revision present in a newer activation must read as live elsewhere"
);
}
#[test]
fn liveness_probe_reports_not_live_when_live_slot_is_the_draining_dispatcher() {
let env_id = "env-1";
let dep_id = DeploymentId::new();
let rev_id = RevisionId::new();
let bundle_id = BundleId::new("customer.support");
let live = activation_with_ids(env_id, dep_id, rev_id, bundle_id);
let draining_dispatcher = std::sync::Arc::clone(&live.routing.dispatcher);
let state = serve_state_with(live);
let probe = SlotLivenessProbe {
state,
draining_dispatcher,
};
assert!(
!probe.is_live_elsewhere(dep_id, rev_id),
"draining the live dispatcher itself must NOT read as live elsewhere"
);
}
#[test]
fn liveness_probe_reports_not_live_when_revision_absent_from_live_slot() {
let env_id = "env-1";
let dep_id = DeploymentId::new();
let rev_removed = RevisionId::new();
let rev_other = RevisionId::new();
let bundle_id = BundleId::new("customer.support");
let draining = activation_with_ids(env_id, dep_id, rev_removed, bundle_id.clone());
let draining_dispatcher = std::sync::Arc::clone(&draining.routing.dispatcher);
let live = activation_with_ids(env_id, dep_id, rev_other, bundle_id);
let state = serve_state_with(live);
let probe = SlotLivenessProbe {
state,
draining_dispatcher,
};
assert!(
!probe.is_live_elsewhere(dep_id, rev_removed),
"a genuinely removed revision must NOT read as live elsewhere"
);
}
}