use crate::{INTERNAL_CLIENT_ID, server::CloneableWbApi};
use serde_json::json;
use std::{ops::ControlFlow, time::Duration};
use tokio::time::{Instant, interval};
use tosub::SubsystemHandle;
use tracing::debug;
#[cfg(not(feature = "commercial"))]
use worterbuch_common::SYSTEM_TOPIC_SOURCES;
use worterbuch_common::{
SYSTEM_TOPIC_COUNT, SYSTEM_TOPIC_LICENSE, SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_STORE,
SYSTEM_TOPIC_UPTIME, SYSTEM_TOPIC_VALUES, SYSTEM_TOPIC_VERSION, WbApi, error::WorterbuchResult,
topic, while_select,
};
pub const VERSION: &str = env!("CARGO_PKG_VERSION");
#[cfg(not(feature = "commercial"))]
pub const LICENSE: &str = env!("CARGO_PKG_LICENSE");
#[cfg(feature = "commercial")]
pub const LICENSE: &str = "COMMERCIAL";
#[cfg(not(feature = "commercial"))]
pub const REPO: &str = env!("CARGO_PKG_REPOSITORY");
pub async fn track_stats(wb: CloneableWbApi, subsys: SubsystemHandle) -> WorterbuchResult<()> {
let start = Instant::now();
wb.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_VERSION),
json!(VERSION),
INTERNAL_CLIENT_ID.to_owned(),
)
.await
.ok();
wb.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_LICENSE),
json!(LICENSE),
INTERNAL_CLIENT_ID.to_owned(),
)
.await
.ok();
#[cfg(feature = "commercial")]
wb.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_LICENSE, "data"),
json!(wb.config().license),
INTERNAL_CLIENT_ID.to_owned(),
)
.await
.ok();
#[cfg(not(feature = "commercial"))]
wb.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_SOURCES),
json!(format!("{REPO}/releases/tag/v{VERSION}")),
INTERNAL_CLIENT_ID.to_owned(),
)
.await
.ok();
let mut interval = interval(Duration::from_secs(1));
while_select! {
biased;
_ = subsys.shutdown_requested() => break,
_ = interval.tick() => update_stats(&wb, start).await,
}
debug!("stats subsystem completed.");
Ok(())
}
async fn update_stats(wb: &CloneableWbApi, start: Instant) -> ControlFlow<()> {
if update_uptime(wb, start.elapsed()).await.is_err() {
return ControlFlow::Break(());
}
if update_message_count(wb).await.is_err() {
return ControlFlow::Break(());
}
#[cfg(feature = "jemalloc")]
if update_jemalloc_stats(wb).await.is_err() {
return ControlFlow::Break(());
}
ControlFlow::Continue(())
}
async fn update_uptime(wb: &CloneableWbApi, uptime: Duration) -> WorterbuchResult<()> {
wb.set(
topic!(SYSTEM_TOPIC_ROOT, SYSTEM_TOPIC_UPTIME),
json!(uptime.as_secs()),
INTERNAL_CLIENT_ID.to_owned(),
)
.await
}
async fn update_message_count(wb: &CloneableWbApi) -> WorterbuchResult<()> {
let len = wb.entries().await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_STORE,
SYSTEM_TOPIC_VALUES,
SYSTEM_TOPIC_COUNT
),
json!(len),
INTERNAL_CLIENT_ID.to_owned(),
)
.await
}
#[cfg(feature = "jemalloc")]
async fn update_jemalloc_stats(wb: &CloneableWbApi) -> miette::Result<()> {
use miette::IntoDiagnostic;
use tikv_jemalloc_ctl::{epoch, stats};
use worterbuch_common::{SYSTEM_TOPIC_FORMATTED, SYSTEM_TOPIC_JEMALLOC, SYSTEM_TOPIC_RAW};
epoch::advance().into_diagnostic()?;
let allocated = stats::allocated::read().into_diagnostic()?;
let resident = stats::resident::read().into_diagnostic()?;
let active = stats::active::read().into_diagnostic()?;
let mapped = stats::mapped::read().into_diagnostic()?;
let metadata = stats::metadata::read().into_diagnostic()?;
let retained = stats::retained::read().into_diagnostic()?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_RAW,
"allocated"
),
json!(allocated),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_RAW,
"resident"
),
json!(resident),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_RAW,
"active"
),
json!(active),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_RAW,
"mapped"
),
json!(mapped),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_RAW,
"retained"
),
json!(retained),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_RAW,
"metadata"
),
json!(metadata),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_FORMATTED,
"allocated"
),
json!(format_bytes(allocated)),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_FORMATTED,
"resident"
),
json!(format_bytes(resident)),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_FORMATTED,
"active"
),
json!(format_bytes(active)),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_FORMATTED,
"mapped"
),
json!(format_bytes(mapped)),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_FORMATTED,
"retained"
),
json!(format_bytes(retained)),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
wb.set(
topic!(
SYSTEM_TOPIC_ROOT,
SYSTEM_TOPIC_JEMALLOC,
SYSTEM_TOPIC_FORMATTED,
"metadata"
),
json!(format_bytes(metadata)),
INTERNAL_CLIENT_ID.to_owned(),
)
.await?;
Ok(())
}
#[cfg(feature = "jemalloc")]
pub fn format_bytes(bytes: usize) -> String {
const UNITS: [&str; 5] = ["B", "KB", "MB", "GB", "TB"];
let mut size = bytes as f64;
let mut unit = 0;
while size >= 1024.0 && unit < UNITS.len() - 1 {
size /= 1024.0;
unit += 1;
}
if unit == 0 {
format!("{:.0} {}", size, UNITS[unit]) } else {
format!("{:.1} {}", size, UNITS[unit]) }
}