pub(crate) mod session;
use crate::core::Column as _;
use crate::sql::sqlx::PgPool;
use crate::sql::Fetcher;
use crate::storage::BoxedStorage;
use axum::body::Body;
use axum::extract::{Form, Multipart, Query, State};
use axum::http::{header, HeaderMap, HeaderValue, Response, StatusCode, Uri};
use axum::middleware::{self, Next};
use axum::response::{Html, IntoResponse, Redirect};
use axum::routing::{get, post};
use axum::{Extension, Router};
use cookie::time::Duration as CookieDuration;
use cookie::{Cookie, SameSite};
use serde::Deserialize;
use std::sync::Arc;
use tera::{Context, Tera};
pub use session::{SessionPayload, SessionSecret, SessionSecretError};
use session::{COOKIE_NAME, SESSION_TTL_SECS};
use super::auth;
use super::branding::{self, BrandAssetKind};
use super::pools::TenantPools;
const RUSTANGO_PNG: &[u8] = include_bytes!("../static/rustango.png");
#[derive(Clone)]
struct ConsoleState {
registry: PgPool,
pools: Option<Arc<TenantPools>>,
session_secret: Arc<SessionSecret>,
tera: Arc<Tera>,
brand_storage: BoxedStorage,
op_brand: Arc<OpBrand>,
tenant_session_secret: Option<Arc<SessionSecret>>,
tenant_handoff_url: String,
}
#[derive(Debug, Clone)]
struct OpBrand {
name: String,
tagline: Option<String>,
logo_url: String,
primary_color: Option<String>,
theme_mode: String,
}
impl OpBrand {
fn defaults() -> Self {
Self {
name: "Rustango".to_owned(),
tagline: None,
logo_url: "/__static__/rustango.png".to_owned(),
primary_color: None,
theme_mode: "auto".to_owned(),
}
}
fn from_env() -> Self {
let mut out = Self::defaults();
#[cfg(feature = "config")]
if let Ok(s) = crate::config::Settings::load_from_env() {
Self::apply_brand_settings(&mut out, &s.brand);
}
Self::apply_env_overrides(&mut out);
out
}
#[cfg(feature = "config")]
fn apply_brand_settings(out: &mut Self, b: &crate::config::BrandSettings) {
if let Some(n) = b.name.as_deref().filter(|s| !s.is_empty()) {
out.name = n.to_owned();
}
if let Some(t) = b.tagline.as_deref().filter(|s| !s.is_empty()) {
out.tagline = Some(t.to_owned());
}
if let Some(u) = b.logo_url.as_deref().filter(|s| !s.is_empty()) {
out.logo_url = u.to_owned();
}
if let Some(hex) = b
.primary_color
.as_deref()
.and_then(branding::validate_hex_color)
{
out.primary_color = Some(hex);
}
if let Some(mode) = b
.theme_mode
.as_deref()
.and_then(branding::validate_theme_mode)
{
out.theme_mode = mode.to_owned();
}
}
fn apply_env_overrides(out: &mut Self) {
if let Ok(v) = std::env::var("RUSTANGO_OPERATOR_BRAND_NAME") {
if !v.is_empty() {
out.name = v;
}
}
if let Ok(v) = std::env::var("RUSTANGO_OPERATOR_TAGLINE") {
if !v.is_empty() {
out.tagline = Some(v);
}
}
if let Ok(v) = std::env::var("RUSTANGO_OPERATOR_LOGO_URL") {
if !v.is_empty() {
out.logo_url = v;
}
}
if let Some(hex) = std::env::var("RUSTANGO_OPERATOR_PRIMARY_COLOR")
.ok()
.as_deref()
.and_then(branding::validate_hex_color)
{
out.primary_color = Some(hex);
}
if let Some(mode) = std::env::var("RUSTANGO_OPERATOR_THEME_MODE")
.ok()
.as_deref()
.and_then(branding::validate_theme_mode)
{
out.theme_mode = mode.to_owned();
}
}
}
#[must_use]
pub fn router(registry: PgPool, secret: SessionSecret) -> Router {
router_inner(
registry,
None,
secret,
branding::default_brand_storage(),
None,
default_tenant_handoff_url(),
)
}
#[must_use]
pub fn router_with_pools(
registry: PgPool,
pools: Arc<TenantPools>,
secret: SessionSecret,
) -> Router {
router_inner(
registry,
Some(pools),
secret,
branding::default_brand_storage(),
None,
default_tenant_handoff_url(),
)
}
#[must_use]
pub fn router_with_impersonation(
registry: PgPool,
pools: Arc<TenantPools>,
secret: SessionSecret,
brand_storage: BoxedStorage,
tenant_session_secret: SessionSecret,
tenant_handoff_url: String,
) -> Router {
router_inner(
registry,
Some(pools),
secret,
brand_storage,
Some(tenant_session_secret),
tenant_handoff_url,
)
}
#[must_use]
pub fn router_with_brand_storage(
registry: PgPool,
pools: Option<Arc<TenantPools>>,
secret: SessionSecret,
brand_storage: BoxedStorage,
) -> Router {
router_inner(
registry,
pools,
secret,
brand_storage,
None,
default_tenant_handoff_url(),
)
}
fn default_tenant_handoff_url() -> String {
super::routes::RouteConfig::default().impersonation_handoff_url
}
fn router_inner(
registry: PgPool,
pools: Option<Arc<TenantPools>>,
secret: SessionSecret,
brand_storage: BoxedStorage,
tenant_session_secret: Option<SessionSecret>,
tenant_handoff_url: String,
) -> Router {
let mut tera = Tera::default();
tera.add_raw_templates([
(
"_theme_tokens.html",
include_str!("../../styles/theme_tokens.html"),
),
(
"_op_styles.html",
include_str!("../templates/_op_styles.html"),
),
(
"_theme_toggle.html",
include_str!("../../admin/templates/_theme_toggle.html"),
),
(
"op_layout.html",
include_str!("../templates/op_layout.html"),
),
("op_login.html", include_str!("../templates/op_login.html")),
(
"op_welcome.html",
include_str!("../templates/op_welcome.html"),
),
(
"op_operators.html",
include_str!("../templates/op_operators.html"),
),
("op_orgs.html", include_str!("../templates/op_orgs.html")),
(
"op_orgs_edit.html",
include_str!("../templates/op_orgs_edit.html"),
),
(
"op_change_password.html",
include_str!("../templates/op_change_password.html"),
),
])
.expect("operator-console templates parse");
let edit_enabled = pools.is_some();
let impersonation_enabled = tenant_session_secret.is_some() && pools.is_some();
let state = ConsoleState {
registry,
pools,
session_secret: Arc::new(secret),
tera: Arc::new(tera),
brand_storage,
op_brand: Arc::new(OpBrand::from_env()),
tenant_session_secret: tenant_session_secret.map(Arc::new),
tenant_handoff_url,
};
let public = Router::new()
.route("/login", get(login_form).post(login_submit))
.route("/logout", post(logout))
.route("/__static__/rustango.png", get(static_rustango_png))
.route("/__brand__/{slug}/{filename}", get(serve_brand_asset));
let mut private = Router::new()
.route("/", get(welcome))
.route("/operators", get(operators_list))
.route("/orgs", get(orgs_list))
.route(
"/change-password",
get(change_password_form).post(change_password_submit),
);
if edit_enabled {
private = private
.route(
"/orgs/{slug}/edit",
get(org_edit_form).post(org_edit_submit),
)
.route(
"/orgs/{slug}/edit/branding",
get(org_post_only_redirect).post(org_edit_branding),
);
}
if impersonation_enabled {
private = private.route(
"/orgs/{slug}/impersonate",
get(org_post_only_redirect).post(org_impersonate),
);
}
let private = private.route_layer(middleware::from_fn_with_state(
state.clone(),
require_session,
));
public.merge(private).with_state(state)
}
fn inject_op_brand(ctx: &mut Context, brand: &OpBrand) {
ctx.insert("brand_name", &brand.name);
ctx.insert("brand_tagline", &brand.tagline);
ctx.insert("brand_logo_url", &brand.logo_url);
ctx.insert("theme_mode", &brand.theme_mode);
ctx.insert(
"brand_css",
&branding::build_op_brand_css(brand.primary_color.as_deref()),
);
}
async fn require_session(
State(state): State<ConsoleState>,
headers: HeaderMap,
uri: Uri,
mut req: axum::http::Request<Body>,
next: Next,
) -> Response<Body> {
let cookie_value = read_cookie(&headers, COOKIE_NAME);
let payload = cookie_value
.as_deref()
.and_then(|v| session::decode(&state.session_secret, v).ok());
let method = req.method().clone();
let raw_next = uri.path_and_query().map(|p| p.as_str()).unwrap_or("/");
let safe_next = sanitize_next_for_method(&method, raw_next);
let Some(payload) = payload else {
return redirect_to_login(&safe_next).into_response();
};
match auth::Operator::objects()
.where_(auth::Operator::id.eq(payload.oid))
.fetch(&state.registry)
.await
{
Ok(rows) => {
let Some(op) = rows.into_iter().next().filter(|o| o.active) else {
return redirect_to_login(&safe_next).into_response();
};
if let Some(ts) = op.password_changed_at {
if payload.iat < ts.timestamp() {
return redirect_to_login(&safe_next).into_response();
}
}
req.extensions_mut().insert(op);
next.run(req).await
}
Err(e) => {
tracing::warn!(target: "crate::tenancy::operator_console", error = %e, "registry lookup");
(StatusCode::INTERNAL_SERVER_ERROR, "registry lookup failed").into_response()
}
}
}
async fn org_post_only_redirect(
axum::extract::Path(slug): axum::extract::Path<String>,
) -> Redirect {
Redirect::to(&format!("/orgs/{slug}/edit"))
}
fn sanitize_next_for_method(method: &axum::http::Method, path: &str) -> String {
if method == axum::http::Method::GET || method == axum::http::Method::HEAD {
return path.to_owned();
}
let path_only = path.split('?').next().unwrap_or(path);
if let Some(rest) = path_only.strip_prefix("/orgs/") {
if let Some(slug_end) = rest.find('/') {
let slug = &rest[..slug_end];
return format!("/orgs/{slug}/edit");
}
}
"/".to_owned()
}
fn redirect_to_login(next_path: &str) -> Response<Body> {
let next = if next_path == "/login" || next_path.starts_with("/logout") {
"/".to_string()
} else {
next_path.to_string()
};
let location = format!("/login?next={}", urlencoding_lite(&next));
Redirect::to(&location).into_response()
}
#[derive(Deserialize)]
struct LoginQuery {
#[serde(default)]
next: Option<String>,
#[serde(default)]
error: Option<String>,
}
async fn login_form(
State(state): State<ConsoleState>,
Query(q): Query<LoginQuery>,
) -> Html<String> {
let mut ctx = Context::new();
inject_op_brand(&mut ctx, &state.op_brand);
ctx.insert("next", &q.next.unwrap_or_else(|| "/".into()));
ctx.insert("error", &q.error);
Html(state.tera.render("op_login.html", &ctx).unwrap_or_default())
}
#[derive(Deserialize)]
struct LoginSubmit {
username: String,
password: String,
#[serde(default)]
next: Option<String>,
}
async fn login_submit(
State(state): State<ConsoleState>,
Form(form): Form<LoginSubmit>,
) -> Response<Body> {
let next = sanitize_next(form.next.as_deref());
let principal =
match auth::authenticate_operator(&state.registry, &form.username, &form.password).await {
Ok(Some(op)) => op,
Ok(None) => {
return Redirect::to(&format!(
"/login?error=Invalid+credentials&next={}",
urlencoding_lite(&next)
))
.into_response();
}
Err(e) => {
tracing::warn!(target: "crate::tenancy::operator_console", error = %e);
return (StatusCode::INTERNAL_SERVER_ERROR, "login failed").into_response();
}
};
let oid = principal.id.get().copied().unwrap_or_default();
let payload = SessionPayload::new(oid, SESSION_TTL_SECS);
let cookie_value = session::encode(&state.session_secret, &payload);
let cookie = Cookie::build((COOKIE_NAME, cookie_value))
.path("/")
.http_only(true)
.same_site(SameSite::Lax)
.max_age(CookieDuration::seconds(SESSION_TTL_SECS))
.build();
let mut resp = Redirect::to(&next).into_response();
resp.headers_mut().append(
header::SET_COOKIE,
HeaderValue::from_str(&cookie.to_string()).expect("cookie is ascii"),
);
resp
}
async fn logout(State(_state): State<ConsoleState>) -> Response<Body> {
let clear = Cookie::build((COOKIE_NAME, ""))
.path("/")
.http_only(true)
.same_site(SameSite::Lax)
.max_age(CookieDuration::seconds(0))
.build();
let mut resp = Redirect::to("/login").into_response();
resp.headers_mut().append(
header::SET_COOKIE,
HeaderValue::from_str(&clear.to_string()).expect("cookie is ascii"),
);
resp
}
async fn change_password_form(
State(state): State<ConsoleState>,
Extension(op): Extension<auth::Operator>,
Query(params): Query<std::collections::HashMap<String, String>>,
) -> Html<String> {
let mut ctx = Context::new();
inject_op_brand(&mut ctx, &state.op_brand);
ctx.insert("section", "change_password");
ctx.insert("operator_username", &op.username);
ctx.insert("error", ¶ms.get("error"));
ctx.insert("success", ¶ms.get("ok"));
Html(
state
.tera
.render("op_change_password.html", &ctx)
.unwrap_or_else(|e| {
tracing::error!(target: "crate::tenancy::operator_console", error = %e, "op_change_password.html render");
"<!doctype html><h1>Change-password page unavailable</h1>".to_owned()
}),
)
}
#[derive(Debug, serde::Deserialize)]
struct OpChangePasswordForm {
current_password: String,
new_password: String,
#[serde(default)]
confirm_password: String,
}
async fn change_password_submit(
State(state): State<ConsoleState>,
Extension(op): Extension<auth::Operator>,
Form(form): Form<OpChangePasswordForm>,
) -> Response<Body> {
let redir = |query: &str| -> Response<Body> {
Redirect::to(&format!("/change-password?{query}")).into_response()
};
let redir_err = |msg: &str| redir(&format!("error={}", crate::url_codec::url_encode(msg)));
if form.current_password.is_empty() || form.new_password.is_empty() {
return redir_err("All fields are required.");
}
if !form.confirm_password.is_empty() && form.confirm_password != form.new_password {
return redir_err("New password and confirmation did not match.");
}
if form.new_password == form.current_password {
return redir_err("New password must differ from the current password.");
}
if form.new_password.chars().count() < 8 {
return redir_err("New password must be at least 8 characters.");
}
let op_id = op.id.get().copied().unwrap_or(0);
if op_id <= 0 {
return redir_err("Session is missing an operator id; please log in again.");
}
let stored: Option<String> = match crate::sql::sqlx::query_scalar(
"SELECT password_hash FROM rustango_operators WHERE id = $1",
)
.bind(op_id)
.fetch_optional(&state.registry)
.await
{
Ok(v) => v,
Err(e) => {
tracing::warn!(target: "crate::tenancy::operator_console", error = %e, "change-password lookup");
return (StatusCode::INTERNAL_SERVER_ERROR, "lookup failed").into_response();
}
};
let Some(stored_hash) = stored else {
return redir_err("Your account no longer exists; please log in again.");
};
let ok = super::password::verify(&form.current_password, &stored_hash).unwrap_or(false);
if !ok {
return redir_err("Current password did not match.");
}
let new_hash = match super::password::hash(&form.new_password) {
Ok(h) => h,
Err(e) => return redir_err(&format!("hash failed: {e}")),
};
if let Err(e) = crate::sql::sqlx::query(
"UPDATE rustango_operators \
SET password_hash = $1, password_changed_at = NOW() WHERE id = $2",
)
.bind(&new_hash)
.bind(op_id)
.execute(&state.registry)
.await
{
tracing::warn!(target: "crate::tenancy::operator_console", error = %e, "change-password update");
return (StatusCode::INTERNAL_SERVER_ERROR, "update failed").into_response();
}
redir("ok=Password+updated")
}
async fn welcome(
State(state): State<ConsoleState>,
Extension(op): Extension<auth::Operator>,
) -> Html<String> {
let mut ctx = Context::new();
inject_op_brand(&mut ctx, &state.op_brand);
ctx.insert("section", "home");
ctx.insert("operator_username", &op.username);
Html(
state
.tera
.render("op_welcome.html", &ctx)
.unwrap_or_default(),
)
}
async fn operators_list(
State(state): State<ConsoleState>,
Extension(op): Extension<auth::Operator>,
) -> Response<Body> {
let rows: Vec<auth::Operator> = match auth::Operator::objects().fetch(&state.registry).await {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response(),
};
let view: Vec<_> = rows
.into_iter()
.map(|o| {
serde_json::json!({
"id": o.id.get().copied().unwrap_or_default(),
"username": o.username,
"active": o.active,
"created_at": o.created_at.format("%Y-%m-%d %H:%M UTC").to_string(),
})
})
.collect();
let mut ctx = Context::new();
inject_op_brand(&mut ctx, &state.op_brand);
ctx.insert("section", "operators");
ctx.insert("operator_username", &op.username);
ctx.insert("operators", &view);
Html(
state
.tera
.render("op_operators.html", &ctx)
.unwrap_or_default(),
)
.into_response()
}
async fn orgs_list(
State(state): State<ConsoleState>,
Extension(op): Extension<auth::Operator>,
) -> Response<Body> {
let rows: Vec<super::Org> = match super::Org::objects().fetch(&state.registry).await {
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response(),
};
let view: Vec<_> = rows
.into_iter()
.map(|o| {
serde_json::json!({
"slug": o.slug,
"display_name": o.display_name,
"storage_mode": o.storage_mode,
"host_pattern": o.host_pattern,
"active": o.active,
"created_at": o.created_at.format("%Y-%m-%d %H:%M UTC").to_string(),
})
})
.collect();
let mut ctx = Context::new();
inject_op_brand(&mut ctx, &state.op_brand);
ctx.insert("section", "orgs");
ctx.insert("operator_username", &op.username);
ctx.insert("orgs", &view);
ctx.insert("edit_enabled", &state.pools.is_some());
Html(state.tera.render("op_orgs.html", &ctx).unwrap_or_default()).into_response()
}
const LOCKED_ORG_FIELDS: &[&str] = &[
"id",
"slug",
"storage_mode",
"schema_name",
"created_at",
"logo_path",
"favicon_path",
];
const DATABASE_URL_FIELD: &str = "database_url";
#[derive(Deserialize, Default)]
struct OrgEditQuery {
#[serde(default)]
error: Option<String>,
#[serde(default)]
notice: Option<String>,
}
async fn org_edit_form(
State(state): State<ConsoleState>,
axum::extract::Path(slug): axum::extract::Path<String>,
Extension(op): Extension<auth::Operator>,
Query(q): Query<OrgEditQuery>,
) -> Response<Body> {
use crate::admin::render;
use crate::core::Model as _;
use crate::sql::sqlx::Row;
let row = match crate::sql::sqlx::query(&format!(
r#"SELECT * FROM "{}" WHERE "slug" = $1"#,
super::Org::SCHEMA.table
))
.bind(&slug)
.fetch_optional(&state.registry)
.await
{
Ok(r) => r,
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response(),
};
let Some(row) = row else {
return (StatusCode::NOT_FOUND, format!("org `{slug}` not found")).into_response();
};
let mut editable_rows: Vec<serde_json::Value> = Vec::new();
let mut locked_rows: Vec<serde_json::Value> = Vec::new();
for field in super::Org::SCHEMA.scalar_fields() {
let prefill = render::render_value_for_input(&row, field);
if LOCKED_ORG_FIELDS.contains(&field.name) {
locked_rows.push(serde_json::json!({
"name": field.name,
"value": prefill,
}));
continue;
}
let (prefill_for_input, helper) = if field.name == DATABASE_URL_FIELD {
let hint = if prefill.starts_with("env:") || prefill.starts_with("vault:") {
Some(format!("current: {prefill}"))
} else if !prefill.is_empty() {
Some("current: <literal connection URL — masked>".to_owned())
} else {
None
};
(
String::new(),
Some(format!(
"{} — leave blank to keep current; new value evicts the cached pool",
hint.unwrap_or_else(|| "no value set".to_owned())
)),
)
} else {
(prefill, None::<String>)
};
let input_html = render::render_input(field, &prefill_for_input, false);
editable_rows.push(serde_json::json!({
"name": field.name,
"input": input_html,
"helper": helper,
}));
}
let logo_path: Option<String> = row.try_get("logo_path").ok().flatten();
let favicon_path: Option<String> = row.try_get("favicon_path").ok().flatten();
let logo_url = branding::brand_asset_url(&slug, logo_path.as_deref(), &state.brand_storage);
let favicon_url =
branding::brand_asset_url(&slug, favicon_path.as_deref(), &state.brand_storage);
let mut ctx = Context::new();
inject_op_brand(&mut ctx, &state.op_brand);
ctx.insert("section", "orgs");
ctx.insert("operator_username", &op.username);
ctx.insert("slug", &slug);
ctx.insert("editable_rows", &editable_rows);
ctx.insert("locked_rows", &locked_rows);
ctx.insert("logo_url", &logo_url);
ctx.insert("favicon_url", &favicon_url);
ctx.insert("error", &q.error);
ctx.insert("notice", &q.notice);
ctx.insert(
"impersonate_enabled",
&state.tenant_session_secret.is_some(),
);
Html(
state
.tera
.render("op_orgs_edit.html", &ctx)
.unwrap_or_default(),
)
.into_response()
}
async fn org_edit_submit(
State(state): State<ConsoleState>,
axum::extract::Path(slug): axum::extract::Path<String>,
Extension(op): Extension<auth::Operator>,
Form(mut form): Form<std::collections::HashMap<String, String>>,
) -> Response<Body> {
use crate::core::Model as _;
let pools = state
.pools
.as_ref()
.expect("edit routes only mounted when pools is Some");
let database_url_supplied = form
.get(DATABASE_URL_FIELD)
.is_some_and(|s| !s.trim().is_empty());
if !database_url_supplied {
form.remove(DATABASE_URL_FIELD);
}
let mut skip: Vec<&str> = LOCKED_ORG_FIELDS.to_vec();
if !database_url_supplied {
skip.push(DATABASE_URL_FIELD);
}
let collected = match crate::forms::collect_values(super::Org::SCHEMA, &form, &skip) {
Ok(v) => v,
Err(e) => return redirect_with_error(&slug, &e.to_string()),
};
if collected.is_empty() {
return redirect_with_error(&slug, "no editable fields supplied");
}
let existing_database_url = match crate::sql::sqlx::query_scalar::<_, Option<String>>(
r#"SELECT "database_url" FROM "rustango_orgs" WHERE "slug" = $1"#,
)
.bind(&slug)
.fetch_optional(&state.registry)
.await
{
Ok(Some(v)) => v,
Ok(None) => {
return (StatusCode::NOT_FOUND, format!("org `{slug}` not found")).into_response()
}
Err(e) => return (StatusCode::INTERNAL_SERVER_ERROR, format!("{e}")).into_response(),
};
let new_database_url = collected.iter().find_map(|(c, v)| {
if *c == DATABASE_URL_FIELD {
match v {
crate::core::SqlValue::String(s) => Some(s.clone()),
_ => None,
}
} else {
None
}
});
let database_url_changed = new_database_url
.as_deref()
.is_some_and(|new| existing_database_url.as_deref() != Some(new));
use std::fmt::Write as _;
let mut sql = format!(r#"UPDATE "{}" SET "#, super::Org::SCHEMA.table);
for (i, (col, _)) in collected.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
let _ = write!(sql, r#""{col}" = ${}"#, i + 1);
}
let _ = write!(sql, r#" WHERE "slug" = ${}"#, collected.len() + 1);
let mut q = crate::sql::sqlx::query(&sql);
for (_, value) in &collected {
q = bind_sql_value(q, value);
}
q = q.bind(&slug);
if let Err(e) = q.execute(&state.registry).await {
return redirect_with_error(&slug, &format!("update failed: {e}"));
}
if database_url_changed {
pools.invalidate(&slug).await;
}
let operator_id = op.id.get().copied().unwrap_or(0);
let mut detail = serde_json::Map::new();
detail.insert(
"action".into(),
serde_json::Value::String("org.edit".into()),
);
let touched_cols: Vec<String> = collected
.iter()
.filter_map(|(c, _)| {
if *c == DATABASE_URL_FIELD {
None
} else {
Some((*c).to_owned())
}
})
.collect();
detail.insert("fields".into(), serde_json::json!(touched_cols));
if database_url_changed {
detail.insert("database_url_rotated".into(), serde_json::json!(true));
}
emit_op_audit(&state.registry, &slug, operator_id, "edit", detail).await;
let notice = if database_url_changed {
format!("updated `{slug}` (pool evicted — next request rebuilds with new URL)")
} else {
format!("updated `{slug}`")
};
Redirect::to(&format!(
"/orgs/{}/edit?notice={}",
urlencoding_lite(&slug),
urlencoding_lite(¬ice),
))
.into_response()
}
fn redirect_with_error(slug: &str, msg: &str) -> Response<Body> {
Redirect::to(&format!(
"/orgs/{}/edit?error={}",
urlencoding_lite(slug),
urlencoding_lite(msg),
))
.into_response()
}
async fn emit_op_audit(
registry: &crate::sql::sqlx::PgPool,
slug: &str,
operator_id: i64,
verb: &str,
extra: serde_json::Map<String, serde_json::Value>,
) {
let mut changes = serde_json::Map::new();
changes.insert(
"tenant_slug".into(),
serde_json::Value::String(slug.to_owned()),
);
changes.insert("operator_id".into(), serde_json::json!(operator_id));
for (k, v) in extra {
changes.insert(k, v);
}
let result = crate::sql::sqlx::query(
r#"INSERT INTO "rustango_audit_log"
("entity_table", "entity_pk", "operation", "source", "changes")
VALUES ('rustango_orgs', $1, 'action', $2, $3)"#,
)
.bind(slug)
.bind(format!("operator:{operator_id}:{verb}"))
.bind(crate::sql::sqlx::types::Json(serde_json::Value::Object(
changes,
)))
.execute(registry)
.await;
if let Err(e) = result {
tracing::warn!(
target: "crate::tenancy::operator_console",
error = %e,
slug = slug,
operator_id,
verb,
"failed to record operator action in audit log",
);
}
}
fn bind_sql_value<'a>(
q: crate::sql::sqlx::query::Query<
'a,
crate::sql::sqlx::Postgres,
crate::sql::sqlx::postgres::PgArguments,
>,
v: &crate::core::SqlValue,
) -> crate::sql::sqlx::query::Query<
'a,
crate::sql::sqlx::Postgres,
crate::sql::sqlx::postgres::PgArguments,
> {
use crate::core::SqlValue;
match v {
SqlValue::Null => q.bind(None::<i64>),
SqlValue::I16(v) => q.bind(*v),
SqlValue::I32(v) => q.bind(*v),
SqlValue::I64(v) => q.bind(*v),
SqlValue::F32(v) => q.bind(*v),
SqlValue::F64(v) => q.bind(*v),
SqlValue::Bool(v) => q.bind(*v),
SqlValue::String(v) => q.bind(v.clone()),
SqlValue::DateTime(v) => q.bind(*v),
SqlValue::Date(v) => q.bind(*v),
SqlValue::Uuid(v) => q.bind(*v),
SqlValue::Json(v) => q.bind(crate::sql::sqlx::types::Json(v.clone())),
SqlValue::List(_) => panic!("List not expected in op_orgs UPDATE"),
}
}
async fn org_edit_branding(
State(state): State<ConsoleState>,
axum::extract::Path(slug): axum::extract::Path<String>,
Extension(op): Extension<auth::Operator>,
mut mp: Multipart,
) -> Response<Body> {
let mut updates: Vec<(&'static str, Option<String>)> = Vec::new();
while let Ok(Some(field)) = mp.next_field().await {
let name = field.name().map(str::to_owned);
let kind = match name.as_deref() {
Some("logo") => BrandAssetKind::Logo,
Some("favicon") => BrandAssetKind::Favicon,
_ => continue,
};
let content_type = field.content_type().map(str::to_owned);
let bytes = match field.bytes().await {
Ok(b) if b.is_empty() => continue,
Ok(b) => b.to_vec(),
Err(e) => return redirect_with_error(&slug, &format!("multipart: {e}")),
};
match branding::save_brand_asset(
&slug,
kind,
&bytes,
content_type.as_deref(),
&state.brand_storage,
)
.await
{
Ok(filename) => {
let column = match kind {
BrandAssetKind::Logo => "logo_path",
BrandAssetKind::Favicon => "favicon_path",
};
updates.push((column, Some(filename)));
}
Err(branding::BrandError::TooLarge { actual, max }) => {
return redirect_with_error(
&slug,
&format!("file too large: {actual} bytes (max {max})"),
);
}
Err(branding::BrandError::UnsupportedContentType(ct)) => {
return redirect_with_error(
&slug,
&format!("unsupported file type `{ct}` — use PNG/JPEG/WebP/ICO"),
);
}
Err(e) => return redirect_with_error(&slug, &format!("upload failed: {e}")),
}
}
if updates.is_empty() {
return redirect_with_error(&slug, "no file chosen");
}
use crate::core::Model as _;
use std::fmt::Write as _;
let mut sql = format!(r#"UPDATE "{}" SET "#, super::Org::SCHEMA.table);
for (i, (col, _)) in updates.iter().enumerate() {
if i > 0 {
sql.push_str(", ");
}
let _ = write!(sql, r#""{col}" = ${}"#, i + 1);
}
let _ = write!(sql, r#" WHERE "slug" = ${}"#, updates.len() + 1);
let mut q = crate::sql::sqlx::query(&sql);
for (_, v) in &updates {
q = q.bind(v.clone());
}
q = q.bind(&slug);
if let Err(e) = q.execute(&state.registry).await {
return redirect_with_error(&slug, &format!("update failed: {e}"));
}
let operator_id = op.id.get().copied().unwrap_or(0);
let assets: Vec<String> = updates
.iter()
.map(|(col, _)| match *col {
"logo_path" => "logo".to_owned(),
"favicon_path" => "favicon".to_owned(),
other => other.to_owned(),
})
.collect();
let mut detail = serde_json::Map::new();
detail.insert(
"action".into(),
serde_json::Value::String("org.branding.upload".into()),
);
detail.insert("assets".into(), serde_json::json!(assets));
emit_op_audit(&state.registry, &slug, operator_id, "branding", detail).await;
let notice = format!("uploaded {} brand asset(s) for `{slug}`", updates.len());
Redirect::to(&format!(
"/orgs/{}/edit?notice={}",
urlencoding_lite(&slug),
urlencoding_lite(¬ice),
))
.into_response()
}
async fn serve_brand_asset(
State(state): State<ConsoleState>,
axum::extract::Path((slug, filename)): axum::extract::Path<(String, String)>,
) -> Response<Body> {
match branding::load_brand_asset(&slug, &filename, &state.brand_storage).await {
Ok((bytes, ct)) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, ct)
.header(header::CACHE_CONTROL, "public, max-age=300")
.body(Body::from(bytes))
.expect("response builds"),
Err(
branding::BrandError::NotFound
| branding::BrandError::InvalidSlug
| branding::BrandError::InvalidFilename,
) => (StatusCode::NOT_FOUND, "not found").into_response(),
Err(e) => {
tracing::warn!(target: "crate::tenancy::operator_console", error = %e, "brand asset");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
async fn static_rustango_png() -> Response<Body> {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "image/png")
.header(header::CACHE_CONTROL, "public, max-age=86400")
.body(Body::from(RUSTANGO_PNG))
.expect("response builds")
}
fn read_cookie(headers: &HeaderMap, name: &str) -> Option<String> {
let raw = headers.get(header::COOKIE)?.to_str().ok()?;
for piece in raw.split(';') {
let piece = piece.trim();
if let Some(value) = piece.strip_prefix(&format!("{name}=")) {
return Some(value.to_owned());
}
}
None
}
fn urlencoding_lite(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for byte in s.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' | b'/' => {
out.push(byte as char);
}
_ => out.push_str(&format!("%{byte:02X}")),
}
}
out
}
fn sanitize_next(next: Option<&str>) -> String {
match next {
Some(s) if s.starts_with('/') && !s.starts_with("//") && !s.contains("://") => s.to_owned(),
_ => "/".to_owned(),
}
}
async fn org_impersonate(
State(state): State<ConsoleState>,
Extension(op): Extension<auth::Operator>,
headers: HeaderMap,
axum::extract::Path(slug): axum::extract::Path<String>,
) -> Response<Body> {
let Some(tenant_secret) = state.tenant_session_secret.clone() else {
return (StatusCode::SERVICE_UNAVAILABLE, "impersonation disabled").into_response();
};
let orgs: Vec<super::Org> = match super::Org::objects()
.where_(super::Org::slug.eq(slug.clone()))
.fetch(&state.registry)
.await
{
Ok(rows) => rows,
Err(e) => {
tracing::warn!(target: "crate::tenancy::operator_console", error = %e, "org lookup failed");
return (StatusCode::INTERNAL_SERVER_ERROR, "registry lookup failed").into_response();
}
};
let Some(org) = orgs.into_iter().next() else {
return (StatusCode::NOT_FOUND, format!("no tenant `{slug}`")).into_response();
};
if !org.active {
return (
StatusCode::CONFLICT,
format!("tenant `{slug}` is inactive — refusing impersonation"),
)
.into_response();
}
let operator_id = op.id.get().copied().unwrap_or(0);
use super::impersonation_handoff as handoff;
let payload =
handoff::HandoffPayload::new(operator_id, slug.clone(), handoff::HANDOFF_TTL_SECS);
let token = handoff::mint(&tenant_secret, &payload);
let mut detail = serde_json::Map::new();
detail.insert(
"action".into(),
serde_json::Value::String("impersonate.start".into()),
);
emit_op_audit(&state.registry, &slug, operator_id, "impersonating", detail).await;
let scheme = std::env::var("RUSTANGO_TENANT_SCHEME").unwrap_or_else(|_| "http".into());
let host = if let Some(pat) = org.host_pattern.as_deref().filter(|s| !s.is_empty()) {
pat.to_owned()
} else {
let apex = std::env::var("RUSTANGO_APEX_DOMAIN").unwrap_or_else(|_| "localhost".into());
format!("{}.{}", slug, apex)
};
let port_suffix = std::env::var("RUSTANGO_TENANT_PORT")
.ok()
.filter(|s| !s.is_empty() && s != "80" && s != "443")
.map(|p| format!(":{p}"))
.or_else(|| {
headers
.get(header::HOST)
.and_then(|v| v.to_str().ok())
.and_then(|h| h.rsplit_once(':').map(|(_, port)| port.to_owned()))
.filter(|p| !p.is_empty() && p != "80" && p != "443")
.map(|p| format!(":{p}"))
})
.unwrap_or_default();
let handoff_path = state.tenant_handoff_url.trim_end_matches('/');
let redirect_to = format!("{scheme}://{host}{port_suffix}{handoff_path}?token={token}");
let mut resp = Redirect::to(&redirect_to).into_response();
resp.headers_mut().insert(
header::REFERRER_POLICY,
HeaderValue::from_static("no-referrer"),
);
tracing::info!(
target: "crate::tenancy::operator_console",
slug = %slug,
operator_id,
ttl_secs = handoff::HANDOFF_TTL_SECS,
redirect_to = %redirect_to,
"minted impersonation handoff token",
);
resp
}
#[cfg(test)]
mod sanitize_next_method_tests {
use super::sanitize_next_for_method;
use axum::http::Method;
#[test]
fn get_passes_through_unchanged() {
assert_eq!(
sanitize_next_for_method(&Method::GET, "/orgs/acme/edit"),
"/orgs/acme/edit"
);
assert_eq!(
sanitize_next_for_method(&Method::HEAD, "/anywhere"),
"/anywhere"
);
}
#[test]
fn post_to_branding_rewrites_to_parent_edit() {
assert_eq!(
sanitize_next_for_method(&Method::POST, "/orgs/acme/edit/branding"),
"/orgs/acme/edit"
);
}
#[test]
fn post_to_impersonate_rewrites_to_parent_edit() {
assert_eq!(
sanitize_next_for_method(&Method::POST, "/orgs/acme/impersonate"),
"/orgs/acme/edit"
);
}
#[test]
fn post_to_edit_rewrites_to_get_edit_form() {
assert_eq!(
sanitize_next_for_method(&Method::POST, "/orgs/acme/edit"),
"/orgs/acme/edit"
);
}
#[test]
fn unknown_post_path_falls_back_to_root() {
assert_eq!(
sanitize_next_for_method(&Method::POST, "/some/random/post"),
"/"
);
}
#[test]
fn query_string_dropped_from_rewrite_target() {
assert_eq!(
sanitize_next_for_method(&Method::POST, "/orgs/acme/impersonate?return=foo"),
"/orgs/acme/edit"
);
}
}
#[cfg(test)]
mod opbrand_tests {
use super::OpBrand;
#[test]
fn defaults_match_documented_values() {
let b = OpBrand::defaults();
assert_eq!(b.name, "Rustango");
assert_eq!(b.theme_mode, "auto");
assert!(b.tagline.is_none());
assert!(b.primary_color.is_none());
assert_eq!(b.logo_url, "/__static__/rustango.png");
}
#[cfg(feature = "config")]
#[test]
fn apply_brand_settings_overrides_defaults() {
let mut b = OpBrand::defaults();
let mut s = crate::config::BrandSettings::default();
s.name = Some("Acme Operator".into());
s.tagline = Some("(prod)".into());
s.primary_color = Some("#ff8800".into());
s.theme_mode = Some("dark".into());
OpBrand::apply_brand_settings(&mut b, &s);
assert_eq!(b.name, "Acme Operator");
assert_eq!(b.tagline.as_deref(), Some("(prod)"));
assert_eq!(b.primary_color.as_deref(), Some("#ff8800"));
assert_eq!(b.theme_mode, "dark");
}
#[cfg(feature = "config")]
#[test]
fn apply_brand_settings_empty_strings_skip() {
let mut b = OpBrand::defaults();
let original = b.name.clone();
let mut s = crate::config::BrandSettings::default();
s.name = Some(String::new());
OpBrand::apply_brand_settings(&mut b, &s);
assert_eq!(b.name, original);
}
#[cfg(feature = "config")]
#[test]
fn apply_brand_settings_rejects_bad_hex() {
let mut b = OpBrand::defaults();
let mut s = crate::config::BrandSettings::default();
s.primary_color = Some("not-a-color".into());
OpBrand::apply_brand_settings(&mut b, &s);
assert!(b.primary_color.is_none());
}
#[cfg(feature = "config")]
#[test]
fn apply_brand_settings_rejects_bad_theme_mode() {
let mut b = OpBrand::defaults();
let original = b.theme_mode.clone();
let mut s = crate::config::BrandSettings::default();
s.theme_mode = Some("midnight".into());
OpBrand::apply_brand_settings(&mut b, &s);
assert_eq!(b.theme_mode, original);
}
}