use crate::adapter::ConnectionHandler;
use crate::app::auth::AuthValidator; use crate::app::config::App; use crate::protocol::constants::EVENT_NAME_MAX_LENGTH as DEFAULT_EVENT_NAME_MAX_LENGTH;
use crate::protocol::messages::{
ApiMessageData, BatchPusherApiMessage, InfoQueryParser, PusherApiMessage, PusherMessage,
};
use crate::utils::{self, validate_channel_name};
use crate::websocket::SocketId;
use axum::{
Json,
extract::{Path, Query, RawQuery, State}, http::{HeaderMap, HeaderValue, Response, StatusCode, Uri, header}, response::{IntoResponse, Response as AxumResponse},
};
use futures_util::future::join_all;
use serde::{Deserialize, Serialize};
use serde_json::{Value, json};
use std::{
collections::{BTreeMap, HashMap}, sync::Arc,
};
use sysinfo::System;
use thiserror::Error;
use tracing::{error, field, info, instrument, warn};
#[derive(Debug, Error)]
pub enum AppError {
#[error("Application not found: {0}")]
AppNotFound(String),
#[error("Application validation failed: {0}")]
AppValidationFailed(String),
#[error("API request authentication failed: {0}")]
ApiAuthFailed(String),
#[error("Channel validation failed: Missing 'channels' or 'channel' field")]
MissingChannelInfo,
#[error("User connection termination failed: {0}")]
TerminationFailed(String),
#[error("Internal Server Error: {0}")]
InternalError(String),
#[error("Serialization Error: {0}")]
SerializationError(#[from] serde_json::Error),
#[error("HTTP Header Build Error: {0}")]
HeaderBuildError(#[from] axum::http::Error),
#[error("Limit exceeded: {0}")]
LimitExceeded(String),
#[error("Invalid input: {0}")]
InvalidInput(String),
}
impl IntoResponse for AppError {
fn into_response(self) -> AxumResponse {
let (status, error_message) = match &self {
AppError::AppNotFound(msg) => (StatusCode::NOT_FOUND, json!({ "error": msg })),
AppError::AppValidationFailed(msg) => {
(StatusCode::INTERNAL_SERVER_ERROR, json!({ "error": msg }))
}
AppError::ApiAuthFailed(msg) => (StatusCode::FORBIDDEN, json!({ "error": msg })),
AppError::MissingChannelInfo => (
StatusCode::BAD_REQUEST,
json!({ "error": "Request must contain 'channels' (list) or 'channel' (string)" }),
),
AppError::TerminationFailed(msg) => {
(StatusCode::INTERNAL_SERVER_ERROR, json!({ "error": msg }))
}
AppError::SerializationError(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "error": format!("Internal error during serialization: {}", e) }),
),
AppError::HeaderBuildError(e) => (
StatusCode::INTERNAL_SERVER_ERROR,
json!({ "error": format!("Internal error building response: {}", e) }),
),
AppError::InternalError(msg) => {
(StatusCode::INTERNAL_SERVER_ERROR, json!({ "error": msg }))
}
AppError::LimitExceeded(msg) => (StatusCode::BAD_REQUEST, json!({ "error": msg })),
AppError::InvalidInput(msg) => (StatusCode::BAD_REQUEST, json!({ "error": msg })),
};
error!(error.message = %self, status_code = %status, "HTTP request failed");
(status, Json(error_message)).into_response()
}
}
impl From<crate::error::Error> for AppError {
fn from(err: crate::error::Error) -> Self {
warn!(original_error = ?err, "Converting internal error to AppError for HTTP response");
match err {
crate::error::Error::InvalidAppKey => {
AppError::AppNotFound(format!("Application key not found or invalid: {}", err))
}
crate::error::Error::ApplicationNotFound => AppError::AppNotFound(err.to_string()),
crate::error::Error::InvalidChannelName(s) => {
AppError::InvalidInput(format!("Invalid channel name: {}", s))
}
crate::error::Error::ChannelError(s) => AppError::InvalidInput(s),
crate::error::Error::AuthError(s) => AppError::ApiAuthFailed(s),
_ => AppError::InternalError(err.to_string()),
}
}
}
#[derive(Deserialize, Debug, Clone)]
pub struct EventQuery {
#[serde(default)]
pub auth_key: String,
#[serde(default)]
pub auth_timestamp: String,
#[serde(default)]
pub auth_version: String,
#[serde(default)]
pub body_md5: String,
#[serde(default)]
pub auth_signature: String,
}
#[derive(Deserialize, Debug)]
pub struct ChannelQuery {
#[serde(default)]
pub info: Option<String>,
#[serde(flatten)]
pub auth_params: EventQuery,
}
#[derive(Deserialize, Debug)]
pub struct ChannelsQuery {
#[serde(default)]
pub filter_by_prefix: Option<String>,
#[serde(default)]
pub info: Option<String>,
#[serde(flatten)]
pub auth_params: EventQuery,
}
#[derive(Serialize)]
struct MemoryStats {
free: u64,
used: u64,
total: u64,
percent: f64,
}
#[derive(Serialize)]
struct UsageResponse {
memory: MemoryStats,
}
fn build_cache_payload(event_name: &str, event_data: &Value) -> Result<String, serde_json::Error> {
serde_json::to_string(&json!({
"event": event_name,
"data": event_data,
}))
}
#[instrument(skip(handler, incoming_request_size, outgoing_response_size), fields(app_id = %app_id))]
async fn record_api_metrics(
handler: &Arc<ConnectionHandler>,
app_id: &str,
incoming_request_size: usize,
outgoing_response_size: usize,
) {
if let Some(metrics_arc) = &handler.metrics {
let metrics = metrics_arc.lock().await;
metrics.mark_api_message(app_id, incoming_request_size, outgoing_response_size);
info!(
incoming_bytes = incoming_request_size,
outgoing_bytes = outgoing_response_size,
"Recorded API message metrics"
);
} else {
info!(
"{}",
"Metrics system not available, skipping metrics recording."
);
}
}
#[instrument(skip_all, fields(service = "usage_monitor"))]
pub async fn usage() -> Result<impl IntoResponse, AppError> {
let mut sys = System::new_all();
sys.refresh_all();
let total = sys.total_memory() * 1024;
let used = sys.used_memory() * 1024;
let free = total.saturating_sub(used);
let percent = if total > 0 {
(used as f64 / total as f64) * 100.0
} else {
0.0
};
let memory_stats = MemoryStats {
free,
used,
total,
percent,
};
let response_payload = UsageResponse {
memory: memory_stats,
};
info!(
total_bytes = total,
used_bytes = used,
free_bytes = free,
usage_percent = format!("{:.2}", percent),
"Memory usage queried"
);
Ok((StatusCode::OK, Json(response_payload)))
}
#[instrument(skip(handler, event_data, app), fields(app_id = app.id, event_name = field::Empty))]
async fn process_single_event_parallel(
handler: &Arc<ConnectionHandler>,
app: &App,
event_data: PusherApiMessage,
collect_info: bool,
) -> Result<HashMap<String, Value>, AppError> {
let PusherApiMessage {
name,
data: event_payload_data, channels,
channel,
socket_id: original_socket_id_str, info, } = event_data;
let event_name_str = name
.as_deref()
.ok_or_else(|| AppError::InvalidInput("Event name is required".to_string()))?;
tracing::Span::current().record("event_name", event_name_str);
let max_event_name_len = app
.max_event_name_length
.unwrap_or(DEFAULT_EVENT_NAME_MAX_LENGTH as u32);
if event_name_str.len() > max_event_name_len as usize {
return Err(AppError::LimitExceeded(format!(
"Event name '{}' exceeds maximum length of {}",
event_name_str, max_event_name_len
)));
}
if let Some(max_payload_kb) = app.max_event_payload_in_kb {
let value_for_size_calc = match &event_payload_data {
Some(ApiMessageData::String(s)) => json!(s),
Some(ApiMessageData::Json(j_val)) => j_val.clone(),
None => json!(null),
};
let payload_size_bytes = utils::data_to_bytes_flexible(vec![value_for_size_calc]);
if payload_size_bytes > (max_payload_kb as usize * 1024) {
return Err(AppError::LimitExceeded(format!(
"Event payload size ({} bytes) for event '{}' exceeds limit ({}KB)",
payload_size_bytes, event_name_str, max_payload_kb
)));
}
}
let mapped_socket_id: Option<SocketId> = original_socket_id_str.map(SocketId);
let target_channels: Vec<String> = match channels {
Some(ch_list) if !ch_list.is_empty() => {
if let Some(max_ch_at_once) = app.max_event_channels_at_once {
if ch_list.len() > max_ch_at_once as usize {
return Err(AppError::LimitExceeded(format!(
"Number of channels ({}) exceeds limit ({})",
ch_list.len(),
max_ch_at_once
)));
}
}
ch_list
}
None => match channel {
Some(ch_str) => vec![ch_str],
None => {
warn!("{}", "Missing 'channels' or 'channel' in event");
return Err(AppError::MissingChannelInfo);
}
},
Some(_) => {
warn!("{}", "Empty 'channels' list provided in event");
return Err(AppError::MissingChannelInfo);
}
};
let channel_processing_futures = target_channels.into_iter().map(|target_channel_str| {
let handler_clone = Arc::clone(handler);
let name_for_task = name.clone(); let payload_for_task = event_payload_data.clone(); let socket_id_for_task = mapped_socket_id.clone(); let info_for_task = info.clone(); let event_name_for_task = event_name_str.to_string();
async move {
info!(channel = %target_channel_str, "Processing channel for event (parallel task)");
validate_channel_name(app, &target_channel_str).await?;
let message_to_send = PusherApiMessage {
name: name_for_task, data: payload_for_task.clone(), channels: None,
channel: Some(target_channel_str.clone()),
socket_id: socket_id_for_task.as_ref().map(|sid| sid.0.clone()),
info: info_for_task.clone(), };
handler_clone
.send_message(
&app.id,
socket_id_for_task.as_ref(),
message_to_send,
&target_channel_str,
)
.await;
let mut collected_channel_specific_info: Option<(String, Value)> = None;
if collect_info {
let is_presence = target_channel_str.starts_with("presence-");
let mut current_channel_info_map = serde_json::Map::new();
if is_presence && info_for_task.as_deref().map_or(false, |s| s.contains("user_count")) {
match handler_clone
.channel_manager
.read()
.await
.get_channel_members(&app.id, &target_channel_str)
.await
{
Ok(members_map) => {
current_channel_info_map
.insert("user_count".to_string(), json!(members_map.len()));
}
Err(e) => {
warn!(
"Failed to get user count for channel {}: {} (internal error: {:?})",
target_channel_str, e, e
);
}
}
}
if info_for_task
.as_deref()
.map_or(false, |s| s.contains("subscription_count"))
{
let count = handler_clone
.connection_manager
.lock()
.await
.get_channel_socket_count(&app.id, &target_channel_str)
.await;
current_channel_info_map.insert("subscription_count".to_string(), json!(count));
}
if !current_channel_info_map.is_empty() {
collected_channel_specific_info = Some((
target_channel_str.clone(),
Value::Object(current_channel_info_map),
));
}
}
if utils::is_cache_channel(&target_channel_str) {
let payload_value_for_cache = match payload_for_task {
Some(ApiMessageData::String(s)) => json!(s),
Some(ApiMessageData::Json(j_val)) => j_val, None => json!(null),
};
match build_cache_payload(&event_name_for_task, &payload_value_for_cache) {
Ok(cache_payload_str) => {
let mut cache_manager_locked = handler_clone.cache_manager.lock().await;
let cache_key_str =
format!("app:{}:channel:{}:cache_miss", &app.id, target_channel_str);
match cache_manager_locked
.set(&cache_key_str, &cache_payload_str, 3600) .await
{
Ok(_) => {
info!(channel = %target_channel_str, cache_key = %cache_key_str, "Cached event for channel");
}
Err(e) => {
error!(channel = %target_channel_str, cache_key = %cache_key_str, error = %e, "Failed to cache event (internal error: {:?})", e);
}
}
}
Err(e) => {
error!(channel = %target_channel_str, error = %e, "Failed to serialize event data for caching");
}
}
}
Ok(collected_channel_specific_info)
}
});
let results: Vec<Result<Option<(String, Value)>, AppError>> =
join_all(channel_processing_futures).await;
let mut final_channels_info_map = HashMap::new();
for result in results {
match result {
Ok(Some((channel_name, info_value))) => {
final_channels_info_map.insert(channel_name, info_value);
}
Ok(None) => {
}
Err(e) => {
return Err(e);
}
}
}
Ok(final_channels_info_map)
}
#[instrument(skip(handler, event_payload), fields(app_id = %app_id))]
pub async fn events(
Path(app_id): Path<String>,
Query(auth_q_params_struct): Query<EventQuery>, State(handler): State<Arc<ConnectionHandler>>,
uri: Uri, RawQuery(raw_query_str_option): RawQuery, Json(event_payload): Json<PusherApiMessage>, ) -> Result<impl IntoResponse, AppError> {
let app = handler
.app_manager
.find_by_id(app_id.as_str())
.await?
.ok_or_else(|| AppError::AppNotFound(app_id.clone()))?;
let need_channel_info = event_payload.info.is_some();
let channels_info_map =
process_single_event_parallel(&handler, &app, event_payload, need_channel_info).await?;
if need_channel_info && !channels_info_map.is_empty() {
let response_payload = json!({
"channels": channels_info_map
});
Ok((StatusCode::OK, Json(response_payload)))
} else {
Ok((StatusCode::OK, Json(json!({ "ok": true }))))
}
}
#[instrument(skip_all, fields(app_id = %app_id, batch_len = field::Empty))]
pub async fn batch_events(
Path(app_id): Path<String>,
Query(auth_q_params_struct): Query<EventQuery>,
State(handler): State<Arc<ConnectionHandler>>,
uri: Uri,
RawQuery(raw_query_str_option): RawQuery,
Json(batch_message_payload): Json<BatchPusherApiMessage>,
) -> Result<impl IntoResponse, AppError> {
let body_bytes = serde_json::to_vec(&batch_message_payload)?;
let batch_events_vec = batch_message_payload.batch;
let batch_len = batch_events_vec.len();
tracing::Span::current().record("batch_len", &batch_len);
info!("Received batch events request with {} events", batch_len);
let app_config = handler
.app_manager
.find_by_id(app_id.as_str())
.await?
.ok_or_else(|| AppError::AppNotFound(app_id.clone()))?;
if let Some(max_batch) = app_config.max_event_batch_size {
if batch_len > max_batch as usize {
return Err(AppError::LimitExceeded(format!(
"Batch size ({}) exceeds limit ({})",
batch_len, max_batch
)));
}
}
let incoming_request_size_bytes = body_bytes.len(); let mut any_message_requests_info = false;
for single_event_message in &batch_events_vec {
if single_event_message.info.is_some() {
any_message_requests_info = true;
break;
}
}
let event_processing_futures = batch_events_vec.into_iter().map(|single_event_message| {
let handler_clone = Arc::clone(&handler);
let app_config_ref = &app_config;
async move {
let should_collect_info_for_this_event = single_event_message.info.is_some();
let channel_info_map = process_single_event_parallel(
&handler_clone,
app_config_ref,
single_event_message.clone(), should_collect_info_for_this_event,
)
.await?;
Ok((single_event_message, channel_info_map))
}
});
let results: Vec<Result<(PusherApiMessage, HashMap<String, Value>), AppError>> =
join_all(event_processing_futures).await;
let mut batch_response_info_vec = Vec::with_capacity(batch_len);
let mut processed_event_data = Vec::with_capacity(batch_len);
for result_item in results {
processed_event_data.push(result_item?); }
if any_message_requests_info {
for (original_msg, channel_info_map_for_event) in processed_event_data {
if let Some(main_channel_for_event) = original_msg
.channel
.as_ref()
.or_else(|| original_msg.channels.as_ref().and_then(|chs| chs.first()))
{
batch_response_info_vec.push(
channel_info_map_for_event
.get(main_channel_for_event)
.cloned()
.unwrap_or_else(|| json!({})), );
} else {
batch_response_info_vec.push(json!({}));
}
}
}
let final_response_payload = if any_message_requests_info {
json!({ "batch": batch_response_info_vec })
} else {
json!({}) };
let outgoing_response_size_bytes_vec = serde_json::to_vec(&final_response_payload)?;
record_api_metrics(
&handler,
&app_id,
incoming_request_size_bytes,
outgoing_response_size_bytes_vec.len(),
)
.await;
info!("{}", "Batch events processed successfully");
Ok((StatusCode::OK, Json(final_response_payload)))
}
#[instrument(skip(handler), fields(app_id = %app_id, channel = %channel_name))]
pub async fn channel(
Path((app_id, channel_name)): Path<(String, String)>,
Query(query_params_specific): Query<ChannelQuery>, State(handler): State<Arc<ConnectionHandler>>,
uri: Uri,
RawQuery(raw_query_str_option): RawQuery,
) -> Result<impl IntoResponse, AppError> {
info!("Request for channel info for channel: {}", channel_name);
let app = handler
.app_manager
.find_by_id(&app_id)
.await?
.ok_or_else(|| AppError::AppNotFound(app_id.clone()))?;
validate_channel_name(&app, &channel_name).await?;
let info_query_str = query_params_specific.info.as_ref(); let wants_subscription_count = info_query_str.wants_subscription_count();
let wants_user_count = info_query_str.wants_user_count();
let wants_cache_data = info_query_str.wants_cache();
let socket_count_val;
{
let mut connection_manager_locked = handler.connection_manager.lock().await;
socket_count_val = connection_manager_locked
.get_channel_socket_count(&app_id, &channel_name)
.await;
}
let user_count_val = if wants_user_count {
if channel_name.starts_with("presence-") {
let members_map = handler
.channel_manager
.read()
.await
.get_channel_members(&app_id, &channel_name)
.await?;
Some(members_map.len() as u64)
} else {
return Err(AppError::InvalidInput(
"user_count is only available for presence channels".to_string(),
));
}
} else {
None
};
let cache_data_tuple = if wants_cache_data && utils::is_cache_channel(&channel_name) {
let mut cache_manager_locked = handler.cache_manager.lock().await;
let cache_key_str = format!("app:{}:channel:{}:cache_miss", app_id, channel_name);
match cache_manager_locked.get(&cache_key_str).await? {
Some(cache_content_str) => {
let ttl_duration = cache_manager_locked
.ttl(&cache_key_str)
.await?
.unwrap_or_else(|| core::time::Duration::from_secs(3600));
Some((cache_content_str, ttl_duration))
}
_ => None,
}
} else {
None
};
let subscription_count_val = if wants_subscription_count {
Some(socket_count_val as u64)
} else {
None
};
let response_payload = PusherMessage::channel_info(
socket_count_val > 0,
subscription_count_val,
user_count_val,
cache_data_tuple,
);
let response_json_bytes = serde_json::to_vec(&response_payload)?;
record_api_metrics(&handler, &app_id, 0, response_json_bytes.len()).await;
info!("Channel info for '{}' retrieved successfully", channel_name);
Ok((StatusCode::OK, Json(response_payload)))
}
#[instrument(skip(handler), fields(app_id = %app_id))]
pub async fn channels(
Path(app_id): Path<String>,
Query(query_params_specific): Query<ChannelsQuery>, State(handler): State<Arc<ConnectionHandler>>,
uri: Uri,
RawQuery(raw_query_str_option): RawQuery,
) -> Result<impl IntoResponse, AppError> {
info!("Request for channels list for app_id: {}", app_id);
let filter_prefix_str = query_params_specific
.filter_by_prefix
.as_deref()
.unwrap_or("");
let wants_user_count = query_params_specific.info.as_ref().wants_user_count();
let app = handler
.app_manager
.find_by_id(&app_id)
.await?
.ok_or_else(|| AppError::AppNotFound(app_id.clone()))?;
let channels_map;
{
let mut connection_manager_locked = handler.connection_manager.lock().await;
channels_map = connection_manager_locked
.get_channels_with_socket_count(&app_id)
.await?;
}
let mut channels_info_response_map = HashMap::new();
for entry in channels_map.iter() {
let channel_name_str = entry.key();
if !channel_name_str.starts_with(filter_prefix_str) {
continue;
}
validate_channel_name(&app, channel_name_str).await?;
let mut current_channel_info_map = serde_json::Map::new();
if wants_user_count {
if channel_name_str.starts_with("presence-") {
let members_map = handler
.channel_manager
.read()
.await
.get_channel_members(&app_id, channel_name_str)
.await?;
current_channel_info_map.insert("user_count".to_string(), json!(members_map.len()));
} else if !filter_prefix_str.starts_with("presence-") {
return Err(AppError::InvalidInput(
"user_count is only available for presence channels. Use filter_by_prefix=presence-".to_string()
));
}
}
if !current_channel_info_map.is_empty() {
channels_info_response_map.insert(
channel_name_str.clone(),
Value::Object(current_channel_info_map),
);
} else if query_params_specific.info.is_none() {
channels_info_response_map.insert(channel_name_str.clone(), json!({}));
}
}
let response_payload = PusherMessage::channels_list(channels_info_response_map);
let response_json_bytes = serde_json::to_vec(&response_payload)?;
record_api_metrics(&handler, &app_id, 0, response_json_bytes.len()).await;
info!("Channels list for app '{}' retrieved successfully", app_id);
Ok((StatusCode::OK, Json(response_payload)))
}
#[instrument(skip(handler), fields(app_id = %app_id, channel = %channel_name))]
pub async fn channel_users(
Path((app_id, channel_name)): Path<(String, String)>,
Query(auth_q_params_struct): Query<EventQuery>, State(handler): State<Arc<ConnectionHandler>>,
) -> Result<impl IntoResponse, AppError> {
let app = handler
.app_manager
.find_by_id(&app_id)
.await?
.ok_or_else(|| AppError::AppNotFound(app_id.clone()))?;
info!("Request for users in channel: {}", channel_name);
validate_channel_name(&app, &channel_name).await?;
if !channel_name.starts_with("presence-") {
return Err(AppError::InvalidInput(
"Only presence channels support this endpoint".to_string(),
));
}
let channel_members_map = handler
.channel_manager
.read()
.await
.get_channel_members(&app_id, &channel_name)
.await?;
let users_vec = channel_members_map
.keys()
.map(|user_id_str| json!({ "id": user_id_str }))
.collect::<Vec<_>>();
let response_payload_val = json!({ "users": users_vec });
let response_json_bytes = serde_json::to_vec(&response_payload_val)?;
record_api_metrics(&handler, &app_id, 0, response_json_bytes.len()).await;
info!(
user_count = users_vec.len(),
"Channel users for '{}' retrieved successfully", channel_name
);
Ok((StatusCode::OK, Json(response_payload_val)))
}
#[instrument(skip(handler), fields(app_id = %app_id, user_id = %user_id))]
pub async fn terminate_user_connections(
Path((app_id, user_id)): Path<(String, String)>,
Query(auth_q_params_struct): Query<EventQuery>, State(handler): State<Arc<ConnectionHandler>>,
uri: Uri,
RawQuery(raw_query_str_option): RawQuery,
) -> Result<impl IntoResponse, AppError> {
info!(
"Received request to terminate user connections for user_id: {}",
user_id
);
let connection_manager_arc = handler.connection_manager.clone();
connection_manager_arc
.lock()
.await
.terminate_connection(&app_id, &user_id)
.await?;
info!(
"Successfully initiated termination for user_id: {}",
user_id
);
Ok((StatusCode::OK, Json(json!({ "ok": true }))))
}
#[instrument(skip(handler), fields(app_id = %app_id))]
pub async fn up(
Path(app_id): Path<String>,
State(handler): State<Arc<ConnectionHandler>>,
) -> Result<impl IntoResponse, AppError> {
info!("Health check received for app_id: {}", app_id);
if handler.app_manager.find_by_id(&app_id).await?.is_none() {
warn!("Health check for non-existent app_id: {}", app_id);
}
if handler.metrics.is_some() {
record_api_metrics(&handler, &app_id, 0, 2).await;
} else {
info!(
"Metrics system not available for health check for app_id: {}.",
app_id
);
}
let response_val = Response::builder()
.status(StatusCode::OK)
.header("X-Health-Check", "OK")
.body("OK".to_string())?;
Ok(response_val)
}
#[instrument(skip(handler), fields(service = "metrics_exporter"))]
pub async fn metrics(
State(handler): State<Arc<ConnectionHandler>>,
) -> Result<impl IntoResponse, AppError> {
info!("{}", "Metrics endpoint called");
let plaintext_metrics_str = match handler.metrics.clone() {
Some(metrics_arc) => {
let metrics_data_guard = metrics_arc.lock().await;
metrics_data_guard.get_metrics_as_plaintext().await
}
None => {
info!(
"{}",
"No metrics data available (metrics collection is not enabled)."
);
"# Metrics collection is not enabled.\n".to_string()
}
};
let mut response_headers = HeaderMap::new();
response_headers.insert(
header::CONTENT_TYPE,
HeaderValue::from_static("text/plain; version=0.0.4; charset=utf-8"),
);
info!(
bytes = plaintext_metrics_str.len(),
"Successfully generated Prometheus metrics"
);
Ok((StatusCode::OK, response_headers, plaintext_metrics_str))
}