use std::collections::BTreeMap;
use std::env;
use crate::config::Config;
use crate::parser::parse_env_reference;
#[cfg(feature = "loki")]
type LokiFilteredLayer = tracing_subscriber::filter::Filtered<
tracing_loki::Layer,
tracing_subscriber::EnvFilter,
tracing_subscriber::Registry,
>;
#[cfg(feature = "loki")]
type ReloadSlot = Option<LokiFilteredLayer>;
#[cfg(feature = "loki")]
type ReloadHandle = tracing_subscriber::reload::Handle<ReloadSlot, tracing_subscriber::Registry>;
#[cfg(feature = "loki")]
static RELOAD_HANDLE: std::sync::OnceLock<ReloadHandle> = std::sync::OnceLock::new();
#[cfg(feature = "loki")]
pub fn reservation_layer()
-> impl tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync + 'static {
let (layer, handle) =
tracing_subscriber::reload::Layer::<ReloadSlot, tracing_subscriber::Registry>::new(None);
let _ = RELOAD_HANDLE.set(handle);
layer
}
#[cfg(not(feature = "loki"))]
pub fn reservation_layer()
-> impl tracing_subscriber::Layer<tracing_subscriber::Registry> + Send + Sync + 'static {
None::<tracing_subscriber::EnvFilter>
}
#[cfg(feature = "loki")]
pub fn attach_loki_layer(cfg: &LokiConfig) -> Result<bool, LokiLayerError> {
use tracing_subscriber::Layer as _;
if !cfg.is_enabled() {
return Ok(false);
}
let handle = RELOAD_HANDLE.get().ok_or(LokiLayerError::NotInstalled)?;
let (layer, task) = build_layer(cfg)?;
let filter = tracing_subscriber::EnvFilter::try_from_env("ATHENA_LOKI_FILTER")
.or_else(|_| tracing_subscriber::EnvFilter::try_from_default_env())
.unwrap_or_else(|_| tracing_subscriber::EnvFilter::new("info"));
let filtered: LokiFilteredLayer = layer.with_filter(filter);
handle
.modify(move |slot| {
*slot = Some(filtered);
})
.map_err(|err| LokiLayerError::Reload(err.to_string()))?;
tokio::spawn(task);
Ok(true)
}
#[cfg(not(feature = "loki"))]
pub fn attach_loki_layer(_cfg: &LokiConfig) -> Result<bool, LokiLayerError> {
Ok(false)
}
#[derive(Debug, Clone, Default)]
pub struct LokiConfig {
pub url: Option<String>,
pub tenant_id: Option<String>,
pub username: Option<String>,
pub password: Option<String>,
pub labels: BTreeMap<String, String>,
pub extra_fields: BTreeMap<String, String>,
}
impl LokiConfig {
pub fn resolve(config: &Config) -> Self {
let mut out = Self::from_config(config);
if let Some(url) = env_string("ATHENA_LOKI_URL") {
out.url = Some(url);
}
if let Some(tenant) = env_string("ATHENA_LOKI_TENANT_ID") {
out.tenant_id = Some(tenant);
}
if let Some(user) = env_string("ATHENA_LOKI_USERNAME") {
out.username = Some(user);
}
if let Some(password) = env_string("ATHENA_LOKI_PASSWORD") {
out.password = Some(password);
}
if let Some(raw) = env_string("ATHENA_LOKI_LABELS") {
for (k, v) in parse_kv_list(&raw) {
out.labels.insert(k, v);
}
}
if let Some(raw) = env_string("ATHENA_LOKI_EXTRA_FIELDS") {
for (k, v) in parse_kv_list(&raw) {
out.extra_fields.insert(k, v);
}
}
out
}
pub fn from_config(config: &Config) -> Self {
let mut out = Self::default();
if let Some(url) = lookup(config, "url").and_then(|v| interpolate_env(v)) {
out.url = Some(url);
}
if let Some(tenant) = lookup(config, "tenant_id").and_then(|v| interpolate_env(v)) {
out.tenant_id = Some(tenant);
}
if let Some(user) = lookup(config, "username").and_then(|v| interpolate_env(v)) {
out.username = Some(user);
}
if let Some(password) = lookup(config, "password").and_then(|v| interpolate_env(v)) {
out.password = Some(password);
}
if let Some(labels) = lookup(config, "labels").and_then(|v| interpolate_env(v)) {
for (k, v) in parse_kv_list(&labels) {
out.labels.insert(k, v);
}
}
if let Some(fields) = lookup(config, "extra_fields").and_then(|v| interpolate_env(v)) {
for (k, v) in parse_kv_list(&fields) {
out.extra_fields.insert(k, v);
}
}
out
}
pub fn is_enabled(&self) -> bool {
self.url
.as_deref()
.map(str::trim)
.is_some_and(|value| !value.is_empty())
}
}
#[cfg(feature = "loki")]
pub fn build_layer(
cfg: &LokiConfig,
) -> Result<(tracing_loki::Layer, tracing_loki::BackgroundTask), LokiLayerError> {
use tracing_loki::url::Url;
let raw_url = cfg.url.as_deref().ok_or(LokiLayerError::MissingUrl)?.trim();
if raw_url.is_empty() {
return Err(LokiLayerError::MissingUrl);
}
let url = Url::parse(raw_url).map_err(|err| LokiLayerError::InvalidUrl(err.to_string()))?;
let mut builder = tracing_loki::builder();
for (key, value) in &cfg.labels {
builder = builder
.label(key, value)
.map_err(|err| LokiLayerError::InvalidLabel(key.clone(), err.to_string()))?;
}
for (key, value) in &cfg.extra_fields {
builder = builder
.extra_field(key, value)
.map_err(|err| LokiLayerError::InvalidExtraField(key.clone(), err.to_string()))?;
}
if let Some(tenant) = cfg.tenant_id.as_deref().map(str::trim)
&& !tenant.is_empty()
{
builder = builder
.http_header("X-Scope-OrgID", tenant)
.map_err(|err| LokiLayerError::InvalidHeader("X-Scope-OrgID", err.to_string()))?;
}
if let (Some(user), Some(password)) = (cfg.username.as_deref(), cfg.password.as_deref()) {
let basic = format!(
"Basic {}",
base64_encode(format!("{user}:{password}").as_bytes())
);
builder = builder
.http_header("Authorization", &basic)
.map_err(|err| LokiLayerError::InvalidHeader("Authorization", err.to_string()))?;
}
let (layer, task) = builder
.build_url(url)
.map_err(|err| LokiLayerError::Build(err.to_string()))?;
Ok((layer, task))
}
#[derive(Debug, thiserror::Error)]
pub enum LokiLayerError {
#[error("ATHENA_LOKI_URL / loki.url is not set or empty")]
MissingUrl,
#[error("ATHENA_LOKI_URL is not a valid URL: {0}")]
InvalidUrl(String),
#[error("invalid Loki label `{0}`: {1}")]
InvalidLabel(String, String),
#[error("invalid Loki extra field `{0}`: {1}")]
InvalidExtraField(String, String),
#[error("invalid Loki HTTP header `{0}`: {1}")]
InvalidHeader(&'static str, String),
#[error("tracing-loki builder error: {0}")]
Build(String),
#[error("loki reservation layer was not installed before attach_loki_layer")]
NotInstalled,
#[error("failed to swap in loki layer via reload handle: {0}")]
Reload(String),
}
fn env_string(key: &str) -> Option<String> {
env::var(key)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn parse_kv_list(raw: &str) -> Vec<(String, String)> {
raw.split(',')
.filter_map(|pair| {
let pair = pair.trim();
if pair.is_empty() {
return None;
}
let (k, v) = pair.split_once('=')?;
let k = k.trim();
let v = v.trim();
if k.is_empty() {
return None;
}
Some((k.to_string(), v.to_string()))
})
.collect()
}
fn interpolate_env(raw: &str) -> Option<String> {
let trimmed = raw.trim();
if trimmed.is_empty() {
return None;
}
if let Some(env_name) = parse_env_reference(trimmed) {
let resolved = env::var(&env_name).ok().unwrap_or_default();
if resolved.trim().is_empty() {
return None;
}
return Some(resolved);
}
Some(trimmed.to_string())
}
fn lookup<'a>(config: &'a Config, key: &str) -> Option<&'a String> {
config.loki.iter().find_map(|map| map.get(key))
}
#[cfg(feature = "loki")]
fn base64_encode(input: &[u8]) -> String {
const ALPHABET: &[u8; 64] = b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/";
let mut out = String::with_capacity(input.len().div_ceil(3) * 4);
let mut chunks = input.chunks_exact(3);
for chunk in chunks.by_ref() {
let n = (u32::from(chunk[0]) << 16) | (u32::from(chunk[1]) << 8) | u32::from(chunk[2]);
out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
out.push(ALPHABET[(n & 0x3f) as usize] as char);
}
let rem = chunks.remainder();
match rem.len() {
1 => {
let n = u32::from(rem[0]) << 16;
out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
out.push('=');
out.push('=');
}
2 => {
let n = (u32::from(rem[0]) << 16) | (u32::from(rem[1]) << 8);
out.push(ALPHABET[((n >> 18) & 0x3f) as usize] as char);
out.push(ALPHABET[((n >> 12) & 0x3f) as usize] as char);
out.push(ALPHABET[((n >> 6) & 0x3f) as usize] as char);
out.push('=');
}
_ => {}
}
out
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn parse_kv_list_splits_on_commas_and_trims() {
let pairs = parse_kv_list("env=prod, service=athena , region=eu-1");
assert_eq!(
pairs,
vec![
("env".to_string(), "prod".to_string()),
("service".to_string(), "athena".to_string()),
("region".to_string(), "eu-1".to_string()),
]
);
}
#[test]
fn parse_kv_list_skips_empty_or_keyless_pairs() {
let pairs = parse_kv_list(",=value, foo=bar,");
assert_eq!(pairs, vec![("foo".to_string(), "bar".to_string())]);
}
#[test]
fn is_enabled_requires_url() {
let cfg = LokiConfig::default();
assert!(!cfg.is_enabled());
let cfg = LokiConfig {
url: Some(" ".to_string()),
..LokiConfig::default()
};
assert!(!cfg.is_enabled());
let cfg = LokiConfig {
url: Some("http://loki:3100".to_string()),
..LokiConfig::default()
};
assert!(cfg.is_enabled());
}
#[cfg(feature = "loki")]
#[test]
fn base64_encode_matches_reference() {
assert_eq!(base64_encode(b""), "");
assert_eq!(base64_encode(b"f"), "Zg==");
assert_eq!(base64_encode(b"fo"), "Zm8=");
assert_eq!(base64_encode(b"foo"), "Zm9v");
assert_eq!(base64_encode(b"foob"), "Zm9vYg==");
assert_eq!(base64_encode(b"foobar"), "Zm9vYmFy");
}
}