use std::sync::Arc;
use crate::sql::sqlx::postgres::{PgPool, PgPoolOptions};
use crate::sql::sqlx::Row;
use axum::body::Body;
use axum::http::{header, HeaderMap, HeaderValue, Request, StatusCode};
use axum::response::{IntoResponse, Redirect, Response};
use axum::Router;
use cookie::time::Duration as CookieDuration;
use cookie::{Cookie, SameSite};
use tera::{Context, Tera};
use tower::ServiceExt;
use tracing::warn;
use super::branding;
use super::error::TenancyError;
use super::org::{Org, StorageMode};
use super::pools::TenantPools;
use super::resolver::OrgResolver;
use super::tenant_console::{self, TenantSessionPayload};
use crate::storage::BoxedStorage;
pub struct TenantAdminBuilder {
pools: Arc<TenantPools>,
registry_url: String,
resolver: Arc<dyn OrgResolver>,
show_only: Option<Vec<String>>,
read_only: Vec<String>,
session: Option<Arc<TenantSessionConfig>>,
actions: Vec<RegisteredAction>,
title: Option<String>,
subtitle: Option<String>,
brand_storage: Option<BoxedStorage>,
}
#[derive(Clone)]
struct RegisteredAction {
table: &'static str,
name: &'static str,
handler: crate::admin::AdminActionFn,
}
struct TenantSessionConfig {
secret: tenant_console::SessionSecret,
tera: Tera,
}
impl TenantAdminBuilder {
#[must_use]
pub fn new(
pools: Arc<TenantPools>,
registry_url: impl Into<String>,
resolver: impl OrgResolver,
) -> Self {
Self {
pools,
registry_url: registry_url.into(),
resolver: Arc::new(resolver),
show_only: None,
read_only: Vec::new(),
session: None,
actions: Vec::new(),
title: None,
subtitle: None,
brand_storage: None,
}
}
#[must_use]
pub fn title(mut self, title: impl Into<String>) -> Self {
self.title = Some(title.into());
self
}
#[must_use]
pub fn subtitle(mut self, subtitle: impl Into<String>) -> Self {
self.subtitle = Some(subtitle.into());
self
}
#[must_use]
pub fn brand_storage(mut self, storage: BoxedStorage) -> Self {
self.brand_storage = Some(storage);
self
}
#[must_use]
pub fn with_session(mut self, secret: tenant_console::SessionSecret) -> Self {
let mut tera = Tera::default();
tera.add_raw_template(
"tenant_login.html",
include_str!("templates/tenant_login.html"),
)
.expect("tenant_login.html parses");
self.session = Some(Arc::new(TenantSessionConfig { secret, tera }));
self
}
#[must_use]
pub fn show_only<I, S>(mut self, tables: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.show_only = Some(tables.into_iter().map(Into::into).collect());
self
}
#[must_use]
pub fn read_only<I, S>(mut self, tables: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.read_only.extend(tables.into_iter().map(Into::into));
self
}
#[must_use]
pub fn register_action<F>(
mut self,
model_table: &'static str,
action_name: &'static str,
handler: F,
) -> Self
where
F: for<'a> Fn(
&'a crate::sql::sqlx::PgPool,
&'a [crate::core::SqlValue],
) -> crate::admin::AdminActionFuture<'a>
+ Send
+ Sync
+ 'static,
{
self.actions.push(RegisteredAction {
table: model_table,
name: action_name,
handler: Arc::new(handler),
});
self
}
#[must_use]
pub fn build(self) -> Router {
let pools = self.pools;
let registry_url = Arc::new(self.registry_url);
let resolver = self.resolver;
let show_only = Arc::new(self.show_only);
let read_only = Arc::new(self.read_only);
let session = self.session;
let actions = Arc::new(self.actions);
let title = Arc::new(self.title);
let subtitle = Arc::new(self.subtitle);
let brand_storage: BoxedStorage = self
.brand_storage
.unwrap_or_else(branding::default_brand_storage);
Router::new().fallback(move |req: Request<Body>| {
let pools = pools.clone();
let registry_url = registry_url.clone();
let resolver = resolver.clone();
let show_only = show_only.clone();
let read_only = read_only.clone();
let session = session.clone();
let actions = actions.clone();
let title = title.clone();
let subtitle = subtitle.clone();
let brand_storage = brand_storage.clone();
async move {
handle_request(
req,
&pools,
®istry_url,
&*resolver,
&show_only,
&read_only,
session.as_deref(),
&actions,
title.as_deref().as_deref(),
subtitle.as_deref().as_deref(),
&brand_storage,
)
.await
}
})
}
}
async fn handle_request(
req: Request<Body>,
pools: &TenantPools,
registry_url: &str,
resolver: &dyn OrgResolver,
show_only: &Option<Vec<String>>,
read_only: &[String],
session: Option<&TenantSessionConfig>,
actions: &[RegisteredAction],
title: Option<&str>,
subtitle: Option<&str>,
brand_storage: &BoxedStorage,
) -> Response {
if let Some(rest) = req.uri().path().strip_prefix("/__brand__/") {
if let Some((slug, filename)) = rest.split_once('/') {
return serve_brand_asset(slug, filename, brand_storage).await;
}
return (StatusCode::NOT_FOUND, "not found").into_response();
}
let (mut parts, body) = req.into_parts();
let org = match resolver.resolve(&parts, pools.registry()).await {
Ok(Some(o)) => o,
Ok(None) => return (StatusCode::NOT_FOUND, "tenant not found").into_response(),
Err(e) => {
warn!(target: "crate::tenancy::admin", error = %e, "resolver error");
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let pool = match build_admin_pool_for_tenant(&org, pools, registry_url).await {
Ok(p) => p,
Err(e) => {
warn!(
target: "crate::tenancy::admin",
slug = %org.slug,
error = %e,
"tenant pool build failed",
);
return (StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response();
}
};
let mut user_perms: Option<std::collections::HashSet<String>> = None;
let mut session_user_id: Option<i64> = None;
if let Some(cfg) = session {
let path = parts.uri.path().to_owned();
let method = parts.method.clone();
if path == "/__static__/rustango.png" {
return rustango_png_response();
}
if path == "/__login" {
return match method {
axum::http::Method::GET => {
login_form(&org, cfg, brand_storage, parts.uri.query()).into_response()
}
axum::http::Method::POST => {
login_submit(&org, cfg, pool.pg_pool(), parts.headers, body).await
}
_ => (StatusCode::METHOD_NOT_ALLOWED, "method not allowed").into_response(),
};
}
if path == "/__logout" && method == axum::http::Method::POST {
return logout_response();
}
match validate_session(&parts.headers, cfg, &org, pool.pg_pool()).await {
SessionCheck::Authenticated {
is_superuser,
user_id,
} => {
session_user_id = Some(user_id);
if !is_superuser {
match super::permissions::user_permissions(user_id, pool.pg_pool()).await {
Ok(codenames) => {
user_perms = Some(codenames.into_iter().collect());
}
Err(e) => {
warn!(
target: "crate::tenancy::admin",
slug = %org.slug,
user_id,
error = %e,
"failed to fetch user permissions",
);
return (
StatusCode::INTERNAL_SERVER_ERROR,
"permission lookup failed",
)
.into_response();
}
}
}
}
SessionCheck::Anonymous => {
return redirect_to_tenant_login(&path).into_response();
}
SessionCheck::Error(msg) => {
return (StatusCode::INTERNAL_SERVER_ERROR, msg).into_response();
}
}
}
let admin_router = build_inner_admin_router(
pool.pg_pool().clone(),
show_only,
read_only,
user_perms,
actions,
title,
subtitle,
&org,
brand_storage,
);
if let Some(stripped) = parts.uri.path().strip_prefix("/__admin") {
let new_path = if stripped.is_empty() { "/" } else { stripped };
let new_pq = if let Some(q) = parts.uri.query() {
format!("{new_path}?{q}")
} else {
new_path.to_owned()
};
if let Ok(new_uri) = new_pq.parse::<axum::http::Uri>() {
parts.uri = new_uri;
}
}
let inner_req = Request::from_parts(parts, body);
let dispatch = async {
match admin_router.oneshot(inner_req).await {
Ok(r) => r,
Err(_infallible) => unreachable!("axum::Router service is Infallible"),
}
};
let response = if let Some(uid) = session_user_id {
crate::audit::with_source(
crate::audit::AuditSource::User {
id: uid.to_string(),
},
dispatch,
)
.await
} else {
dispatch.await
};
drop(pool);
response
}
enum SessionCheck {
Authenticated {
is_superuser: bool,
user_id: i64,
},
Anonymous,
Error(String),
}
async fn validate_session(
headers: &HeaderMap,
cfg: &TenantSessionConfig,
org: &Org,
tenant_pool: &PgPool,
) -> SessionCheck {
let Some(cookie_value) = read_cookie(headers, tenant_console::COOKIE_NAME) else {
return SessionCheck::Anonymous;
};
let payload = match tenant_console::decode(&cfg.secret, &org.slug, &cookie_value) {
Ok(p) => p,
Err(_) => return SessionCheck::Anonymous,
};
match rustango::sql::sqlx::query(
"SELECT is_superuser, active FROM rustango_users WHERE id = $1",
)
.bind(payload.uid)
.fetch_optional(tenant_pool)
.await
{
Ok(Some(row)) => {
let active: bool = row.try_get("active").unwrap_or(false);
if !active {
return SessionCheck::Anonymous;
}
let is_superuser: bool = row.try_get("is_superuser").unwrap_or(false);
SessionCheck::Authenticated {
is_superuser,
user_id: payload.uid,
}
}
Ok(None) => SessionCheck::Anonymous,
Err(e) => {
warn!(
target: "crate::tenancy::admin",
slug = %org.slug,
error = %e,
"tenant user lookup failed during session validation",
);
SessionCheck::Error("session lookup failed".into())
}
}
}
fn redirect_to_tenant_login(next_path: &str) -> Redirect {
let next = if next_path == "/__login" || next_path.starts_with("/__logout") {
"/".to_string()
} else {
next_path.to_string()
};
let location = format!("/__login?next={}", urlencoding_lite(&next));
Redirect::to(&location)
}
fn login_form(
org: &Org,
cfg: &TenantSessionConfig,
brand_storage: &BoxedStorage,
query: Option<&str>,
) -> axum::response::Html<String> {
let mut next: Option<String> = None;
let mut error: Option<String> = None;
if let Some(q) = query {
for pair in q.split('&') {
let Some((k, v)) = pair.split_once('=') else {
continue;
};
let v = url_decode_lite(v);
match k {
"next" => next = Some(v),
"error" => error = Some(v),
_ => {}
}
}
}
let mut ctx = Context::new();
ctx.insert("tenant_slug", &org.slug);
ctx.insert("tenant_name", &org.display_name);
ctx.insert("next", &next.unwrap_or_else(|| "/".into()));
ctx.insert("error", &error);
let brand_name = org
.brand_name
.as_deref()
.filter(|s| !s.is_empty())
.unwrap_or(&org.display_name);
ctx.insert("brand_name", brand_name);
ctx.insert("brand_tagline", &org.brand_tagline);
let brand_logo_url =
super::branding::brand_asset_url(&org.slug, org.logo_path.as_deref(), brand_storage);
ctx.insert("brand_logo_url", &brand_logo_url);
let brand_favicon_url =
super::branding::brand_asset_url(&org.slug, org.favicon_path.as_deref(), brand_storage);
ctx.insert("brand_favicon_url", &brand_favicon_url);
let theme_mode = org
.theme_mode
.as_deref()
.and_then(super::branding::validate_theme_mode)
.unwrap_or("auto");
ctx.insert("theme_mode", theme_mode);
let brand_css = super::branding::build_brand_css(org);
ctx.insert("brand_css", &brand_css);
axum::response::Html(
cfg.tera
.render("tenant_login.html", &ctx)
.unwrap_or_default(),
)
}
#[derive(serde::Deserialize)]
struct LoginSubmitForm {
username: String,
password: String,
#[serde(default)]
next: Option<String>,
}
async fn login_submit(
org: &Org,
cfg: &TenantSessionConfig,
tenant_pool: &PgPool,
_headers: HeaderMap,
body: Body,
) -> Response {
let bytes = match http_body_util::BodyExt::collect(body).await {
Ok(b) => b.to_bytes(),
Err(_) => return (StatusCode::BAD_REQUEST, "could not read body").into_response(),
};
let form: LoginSubmitForm = match serde_urlencoded::from_bytes(&bytes) {
Ok(f) => f,
Err(_) => return (StatusCode::BAD_REQUEST, "malformed login form").into_response(),
};
let next = sanitize_next(form.next.as_deref());
let row = match rustango::sql::sqlx::query(
"SELECT id, password_hash, is_superuser, active FROM rustango_users \
WHERE username = $1",
)
.bind(&form.username)
.fetch_optional(tenant_pool)
.await
{
Ok(r) => r,
Err(e) => {
warn!(target: "crate::tenancy::admin", error = %e, "login query");
return (StatusCode::INTERNAL_SERVER_ERROR, "login failed").into_response();
}
};
let bad_creds = || -> Response {
Redirect::to(&format!(
"/__login?error=Invalid+credentials&next={}",
urlencoding_lite(&next)
))
.into_response()
};
let Some(row) = row else {
return bad_creds();
};
let active: bool = row.try_get("active").unwrap_or(false);
if !active {
return bad_creds();
}
let hash: String = match row.try_get("password_hash") {
Ok(h) => h,
Err(_) => return bad_creds(),
};
let ok = match super::password::verify(&form.password, &hash) {
Ok(b) => b,
Err(_) => false,
};
if !ok {
return bad_creds();
}
let uid: i64 = match row.try_get("id") {
Ok(v) => v,
Err(_) => return bad_creds(),
};
let payload = TenantSessionPayload::new(uid, &org.slug, tenant_console::SESSION_TTL_SECS);
let cookie_value = tenant_console::encode(&cfg.secret, &payload);
let cookie = Cookie::build((tenant_console::COOKIE_NAME, cookie_value))
.path("/")
.http_only(true)
.same_site(SameSite::Lax)
.max_age(CookieDuration::seconds(tenant_console::SESSION_TTL_SECS))
.build();
let mut resp = Redirect::to(&next).into_response();
resp.headers_mut().append(
header::SET_COOKIE,
HeaderValue::from_str(&cookie.to_string()).expect("cookie is ascii"),
);
resp
}
fn logout_response() -> Response {
let clear = Cookie::build((tenant_console::COOKIE_NAME, ""))
.path("/")
.http_only(true)
.same_site(SameSite::Lax)
.max_age(CookieDuration::seconds(0))
.build();
let mut resp = Redirect::to("/__login").into_response();
resp.headers_mut().append(
header::SET_COOKIE,
HeaderValue::from_str(&clear.to_string()).expect("cookie is ascii"),
);
resp
}
fn rustango_png_response() -> Response {
Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, "image/png")
.header(header::CACHE_CONTROL, "public, max-age=86400")
.body(Body::from(tenant_console::RUSTANGO_PNG))
.expect("response builds")
}
fn read_cookie(headers: &HeaderMap, name: &str) -> Option<String> {
let raw = headers.get(header::COOKIE)?.to_str().ok()?;
for piece in raw.split(';') {
let piece = piece.trim();
if let Some(value) = piece.strip_prefix(&format!("{name}=")) {
return Some(value.to_owned());
}
}
None
}
fn urlencoding_lite(s: &str) -> String {
let mut out = String::with_capacity(s.len());
for byte in s.bytes() {
match byte {
b'A'..=b'Z' | b'a'..=b'z' | b'0'..=b'9' | b'-' | b'.' | b'_' | b'~' | b'/' => {
out.push(byte as char);
}
_ => out.push_str(&format!("%{byte:02X}")),
}
}
out
}
use crate::url_codec::url_decode as url_decode_lite;
fn sanitize_next(next: Option<&str>) -> String {
match next {
Some(s)
if s.starts_with('/')
&& !s.starts_with("//")
&& !s.contains("://")
&& !s.starts_with("/__login")
&& !s.starts_with("/__logout") =>
{
s.to_owned()
}
_ => "/".to_owned(),
}
}
enum AdminPool {
Database(Arc<PgPool>),
Schema(PgPool),
}
impl AdminPool {
fn pg_pool(&self) -> &PgPool {
match self {
Self::Database(p) => p,
Self::Schema(p) => p,
}
}
}
impl Drop for AdminPool {
fn drop(&mut self) {
}
}
async fn build_admin_pool_for_tenant(
org: &Org,
pools: &TenantPools,
registry_url: &str,
) -> Result<AdminPool, TenancyError> {
let mode = StorageMode::parse(&org.storage_mode).map_err(|got| {
TenancyError::Validation(format!(
"org `{}` has unknown storage_mode `{got}`",
org.slug
))
})?;
match mode {
StorageMode::Database => {
let tp = pools.pool_for_org(org).await?;
match tp {
super::pools::TenantPool::Database { pool } => Ok(AdminPool::Database(pool)),
super::pools::TenantPool::Schema { .. } => {
unreachable!("StorageMode::Database parsed but pool_for_org returned Schema")
}
}
}
StorageMode::Schema => {
let schema = org.schema_name.clone().unwrap_or_else(|| org.slug.clone());
let pool = build_short_lived_schema_pool(registry_url, &schema).await?;
Ok(AdminPool::Schema(pool))
}
}
}
async fn build_short_lived_schema_pool(
registry_url: &str,
schema: &str,
) -> Result<PgPool, TenancyError> {
let schema_owned: Arc<str> = Arc::from(schema);
let pool = PgPoolOptions::new()
.max_connections(2)
.after_connect(move |conn, _meta| {
let schema = Arc::clone(&schema_owned);
Box::pin(async move {
let stmt = format!("SET search_path TO {}, public", quote_ident(&schema));
rustango::sql::sqlx::query(&stmt).execute(conn).await?;
Ok(())
})
})
.connect(registry_url)
.await?;
Ok(pool)
}
fn build_inner_admin_router(
pool: PgPool,
show_only: &Option<Vec<String>>,
read_only: &[String],
user_perms: Option<std::collections::HashSet<String>>,
actions: &[RegisteredAction],
title: Option<&str>,
subtitle: Option<&str>,
org: &Org,
brand_storage: &BoxedStorage,
) -> Router {
let mut builder = crate::admin::Builder::new(pool);
if let Some(allow) = show_only {
builder = builder.show_only(allow.iter().cloned());
}
if !read_only.is_empty() {
builder = builder.read_only(read_only.iter().cloned());
}
if let Some(perms) = user_perms {
builder = builder.with_user_perms(perms);
}
if let Some(t) = title {
builder = builder.title(t);
}
if let Some(s) = subtitle {
builder = builder.subtitle(s);
}
if let Some(name) = org.brand_name.as_deref() {
builder = builder.brand_name(name);
} else if !org.display_name.is_empty() {
builder = builder.brand_name(&org.display_name);
}
if let Some(tag) = org.brand_tagline.as_deref() {
builder = builder.brand_tagline(tag);
}
if let Some(logo_url) =
branding::brand_asset_url(&org.slug, org.logo_path.as_deref(), brand_storage)
{
builder = builder.brand_logo_url(logo_url);
}
if let Some(mode) = org
.theme_mode
.as_deref()
.and_then(branding::validate_theme_mode)
{
builder = builder.theme_mode(mode);
}
if let Some(css) = branding::build_brand_css(org) {
builder = builder.tenant_brand_css(css);
}
for action in actions {
let handler = action.handler.clone();
builder = builder.register_action(action.table, action.name, move |pool, pks| {
handler(pool, pks)
});
}
builder.build()
}
async fn serve_brand_asset(slug: &str, filename: &str, brand_storage: &BoxedStorage) -> Response {
match branding::load_brand_asset(slug, filename, brand_storage).await {
Ok((bytes, ct)) => Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_TYPE, ct)
.header(header::CACHE_CONTROL, "public, max-age=300")
.body(Body::from(bytes))
.expect("response builds")
.into_response(),
Err(
branding::BrandError::NotFound
| branding::BrandError::InvalidSlug
| branding::BrandError::InvalidFilename,
) => (StatusCode::NOT_FOUND, "not found").into_response(),
Err(e) => {
warn!(target: "crate::tenancy::admin", error = %e, "brand asset");
(StatusCode::INTERNAL_SERVER_ERROR, e.to_string()).into_response()
}
}
}
fn quote_ident(name: &str) -> String {
let escaped = name.replace('"', "\"\"");
format!("\"{escaped}\"")
}