use std::{
collections::BTreeSet,
env,
error::Error,
fmt,
path::PathBuf,
str::FromStr,
sync::{
mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TryRecvError, TrySendError},
Arc, Mutex, OnceLock,
},
thread,
time::{Duration, Instant},
};
use chrono::{DateTime, Utc};
use reqwest::blocking::Client as HttpClient;
use serde::{Deserialize, Serialize};
use serde_json::{json, Map, Value};
pub const MASTER_LOG_API_KEY_ENV: &str = "MASTER_LOG_API_KEY";
pub const MASTER_LOG_ENDPOINT_ENV: &str = "MASTER_LOG_ENDPOINT";
pub const MASTER_LOG_TIMEOUT_ENV: &str = "MASTER_LOG_TIMEOUT_SECONDS";
pub const MASTER_LOG_ECHO_ENV: &str = "MASTER_LOG_ECHO";
pub const MASTER_LOG_ASYNC_ENV: &str = "MASTER_LOG_ASYNC";
pub const MASTER_LOG_BATCH_SIZE_ENV: &str = "MASTER_LOG_BATCH_SIZE";
pub const MASTER_LOG_FLUSH_INTERVAL_ENV: &str = "MASTER_LOG_FLUSH_INTERVAL_SECONDS";
pub const MASTER_LOG_MAX_QUEUE_SIZE_ENV: &str = "MASTER_LOG_MAX_QUEUE_SIZE";
pub const MASTER_LOG_DROP_WHEN_FULL_ENV: &str = "MASTER_LOG_DROP_WHEN_FULL";
pub const MASTER_LOG_QUEUE_TIMEOUT_ENV: &str = "MASTER_LOG_QUEUE_TIMEOUT_SECONDS";
pub const MASTER_LOG_BACKPRESSURE_ENV: &str = "MASTER_LOG_BACKPRESSURE";
pub const MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG_ENV: &str =
"MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG";
pub const MASTER_LOG_MAX_ENQUEUE_SLEEP_ENV: &str = "MASTER_LOG_MAX_ENQUEUE_SLEEP_SECONDS";
pub const MASTER_LOG_MIN_REQUEST_INTERVAL_ENV: &str = "MASTER_LOG_MIN_REQUEST_INTERVAL_SECONDS";
pub const DEFAULT_TIMEOUT_SECONDS: f64 = 2.0;
pub const DEFAULT_ASYNC_MODE: bool = true;
pub const DEFAULT_BATCH_SIZE: usize = 100;
pub const DEFAULT_FLUSH_INTERVAL_SECONDS: f64 = 1.0;
pub const DEFAULT_MAX_QUEUE_SIZE: usize = 10_000;
pub const DEFAULT_DROP_WHEN_FULL: bool = true;
pub const DEFAULT_QUEUE_TIMEOUT_SECONDS: f64 = 0.25;
pub const DEFAULT_BACKPRESSURE: bool = true;
pub const DEFAULT_INITIAL_SEND_SECONDS_PER_LOG: f64 = 0.005;
pub const DEFAULT_MAX_ENQUEUE_SLEEP_SECONDS: f64 = 0.25;
pub const DEFAULT_MIN_REQUEST_INTERVAL_SECONDS: f64 = 0.0;
pub const DEFAULT_SEND_RATE_SMOOTHING: f64 = 0.2;
pub const DEFAULT_SHUTDOWN_TIMEOUT_SECONDS: f64 = 5.0;
pub const LIBRARY_VERSION: &str = env!("CARGO_PKG_VERSION");
#[derive(Clone, Debug)]
pub struct MasterLogConfig {
pub api_key: Option<String>,
pub endpoint: Option<String>,
pub timeout: Duration,
pub echo: bool,
pub async_mode: bool,
pub batch_size: usize,
pub flush_interval: Duration,
pub max_queue_size: usize,
pub drop_when_full: bool,
pub queue_timeout: Duration,
pub backpressure: bool,
pub initial_send_seconds_per_log: f64,
pub max_enqueue_sleep: Duration,
pub min_request_interval: Duration,
}
pub type Config = MasterLogConfig;
impl Default for MasterLogConfig {
fn default() -> Self {
Self {
api_key: None,
endpoint: None,
timeout: duration_from_seconds(DEFAULT_TIMEOUT_SECONDS),
echo: false,
async_mode: DEFAULT_ASYNC_MODE,
batch_size: DEFAULT_BATCH_SIZE,
flush_interval: duration_from_seconds(DEFAULT_FLUSH_INTERVAL_SECONDS),
max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
drop_when_full: DEFAULT_DROP_WHEN_FULL,
queue_timeout: duration_from_seconds(DEFAULT_QUEUE_TIMEOUT_SECONDS),
backpressure: DEFAULT_BACKPRESSURE,
initial_send_seconds_per_log: DEFAULT_INITIAL_SEND_SECONDS_PER_LOG,
max_enqueue_sleep: duration_from_seconds(DEFAULT_MAX_ENQUEUE_SLEEP_SECONDS),
min_request_interval: duration_from_seconds(DEFAULT_MIN_REQUEST_INTERVAL_SECONDS),
}
}
}
impl MasterLogConfig {
pub fn from_env() -> Self {
let mut config = Self::default();
config.api_key = env_non_empty(MASTER_LOG_API_KEY_ENV);
config.endpoint = env_non_empty(MASTER_LOG_ENDPOINT_ENV);
config.timeout = env_duration(MASTER_LOG_TIMEOUT_ENV).unwrap_or(config.timeout);
config.echo = env_bool(MASTER_LOG_ECHO_ENV).unwrap_or(config.echo);
config.async_mode = env_bool(MASTER_LOG_ASYNC_ENV).unwrap_or(config.async_mode);
config.batch_size = positive_usize(env_usize(MASTER_LOG_BATCH_SIZE_ENV), config.batch_size);
config.flush_interval = positive_duration(
env_duration(MASTER_LOG_FLUSH_INTERVAL_ENV),
config.flush_interval,
);
config.max_queue_size = non_negative_usize(
env_usize(MASTER_LOG_MAX_QUEUE_SIZE_ENV),
config.max_queue_size,
);
config.drop_when_full =
env_bool(MASTER_LOG_DROP_WHEN_FULL_ENV).unwrap_or(config.drop_when_full);
config.queue_timeout = non_negative_duration(
env_duration(MASTER_LOG_QUEUE_TIMEOUT_ENV),
config.queue_timeout,
);
config.backpressure = env_bool(MASTER_LOG_BACKPRESSURE_ENV).unwrap_or(config.backpressure);
config.initial_send_seconds_per_log = non_negative_f64(
env_f64(MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG_ENV),
config.initial_send_seconds_per_log,
);
config.max_enqueue_sleep = non_negative_duration(
env_duration(MASTER_LOG_MAX_ENQUEUE_SLEEP_ENV),
config.max_enqueue_sleep,
);
config.min_request_interval = non_negative_duration(
env_duration(MASTER_LOG_MIN_REQUEST_INTERVAL_ENV),
config.min_request_interval,
);
config
}
pub fn api_key(mut self, value: impl Into<String>) -> Self {
self.api_key = Some(value.into());
self
}
pub fn endpoint(mut self, value: impl Into<String>) -> Self {
self.endpoint = Some(value.into());
self
}
pub fn timeout(mut self, value: Duration) -> Self {
self.timeout = value;
self
}
pub fn echo(mut self, value: bool) -> Self {
self.echo = value;
self
}
pub fn async_mode(mut self, value: bool) -> Self {
self.async_mode = value;
self
}
pub fn batch_size(mut self, value: usize) -> Self {
self.batch_size = value.max(1);
self
}
pub fn flush_interval(mut self, value: Duration) -> Self {
self.flush_interval = if value.is_zero() {
duration_from_seconds(DEFAULT_FLUSH_INTERVAL_SECONDS)
} else {
value
};
self
}
pub fn max_queue_size(mut self, value: usize) -> Self {
self.max_queue_size = value;
self
}
pub fn drop_when_full(mut self, value: bool) -> Self {
self.drop_when_full = value;
self
}
pub fn queue_timeout(mut self, value: Duration) -> Self {
self.queue_timeout = value;
self
}
pub fn backpressure(mut self, value: bool) -> Self {
self.backpressure = value;
self
}
pub fn initial_send_seconds_per_log(mut self, value: f64) -> Self {
self.initial_send_seconds_per_log = value.max(0.0);
self
}
pub fn max_enqueue_sleep(mut self, value: Duration) -> Self {
self.max_enqueue_sleep = value;
self
}
pub fn min_request_interval(mut self, value: Duration) -> Self {
self.min_request_interval = value;
self
}
}
#[derive(Clone, Debug)]
pub struct MasterLogResult {
pub ok: bool,
pub status_code: Option<u16>,
pub event_id: Option<String>,
pub error: Option<String>,
pub response: Option<Value>,
pub queued: bool,
pub accepted: Option<usize>,
}
impl MasterLogResult {
pub fn ok() -> Self {
Self {
ok: true,
status_code: None,
event_id: None,
error: None,
response: None,
queued: false,
accepted: None,
}
}
pub fn queued() -> Self {
Self {
queued: true,
..Self::ok()
}
}
pub fn failed(error: impl Into<String>) -> Self {
Self {
ok: false,
status_code: None,
event_id: None,
error: Some(error.into()),
response: None,
queued: false,
accepted: None,
}
}
}
#[derive(Debug)]
pub enum MasterLogError {
MissingConfig(&'static str),
InvalidSeverity(String),
EmptyEndpoint,
Http(String),
Json(String),
QueueFull,
WorkerUnavailable,
Timeout(String),
}
impl fmt::Display for MasterLogError {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
match self {
Self::MissingConfig(name) => write!(formatter, "{name} is required"),
Self::InvalidSeverity(value) => write!(
formatter,
"severity must be one of trace, debug, info, warn, error, fatal; got {value}"
),
Self::EmptyEndpoint => write!(formatter, "{MASTER_LOG_ENDPOINT_ENV} is empty"),
Self::Http(message) => formatter.write_str(message),
Self::Json(message) => formatter.write_str(message),
Self::QueueFull => formatter.write_str("Master Log queue is full"),
Self::WorkerUnavailable => formatter.write_str("Master Log worker is unavailable"),
Self::Timeout(message) => formatter.write_str(message),
}
}
}
impl Error for MasterLogError {}
impl From<MasterLogError> for MasterLogResult {
fn from(error: MasterLogError) -> Self {
Self::failed(error.to_string())
}
}
#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
#[serde(rename_all = "lowercase")]
pub enum Severity {
Trace,
Debug,
Info,
Warn,
Error,
Fatal,
}
impl Severity {
pub fn all() -> [Self; 6] {
[
Self::Trace,
Self::Debug,
Self::Info,
Self::Warn,
Self::Error,
Self::Fatal,
]
}
}
impl fmt::Display for Severity {
fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
let value = match self {
Self::Trace => "trace",
Self::Debug => "debug",
Self::Info => "info",
Self::Warn => "warn",
Self::Error => "error",
Self::Fatal => "fatal",
};
formatter.write_str(value)
}
}
impl FromStr for Severity {
type Err = MasterLogError;
fn from_str(value: &str) -> Result<Self, Self::Err> {
match value.trim().to_ascii_lowercase().as_str() {
"trace" => Ok(Self::Trace),
"debug" => Ok(Self::Debug),
"info" => Ok(Self::Info),
"warn" | "warning" => Ok(Self::Warn),
"error" => Ok(Self::Error),
"fatal" => Ok(Self::Fatal),
other => Err(MasterLogError::InvalidSeverity(other.to_string())),
}
}
}
#[derive(Clone, Debug)]
pub struct LogEntry {
body: String,
title: Option<String>,
severity: Severity,
tags: Vec<String>,
metadata: Value,
ttl_seconds: Option<u64>,
expires_at: Option<DateTime<Utc>>,
created_at: Option<DateTime<Utc>>,
}
impl LogEntry {
pub fn new(body: impl Into<String>) -> Self {
Self {
body: body.into(),
title: None,
severity: Severity::Info,
tags: Vec::new(),
metadata: Value::Object(Map::new()),
ttl_seconds: None,
expires_at: None,
created_at: None,
}
}
pub fn with_title(title: impl Into<String>, body: impl Into<String>) -> Self {
Self::new(body).title(title)
}
pub fn severity(mut self, value: Severity) -> Self {
self.severity = value;
self
}
pub fn severity_str(mut self, value: impl AsRef<str>) -> Result<Self, MasterLogError> {
self.severity = Severity::from_str(value.as_ref())?;
Ok(self)
}
pub fn title(mut self, value: impl Into<String>) -> Self {
self.title = Some(value.into());
self
}
pub fn tag(mut self, value: impl Into<String>) -> Self {
self.tags.push(value.into());
self
}
pub fn tags<I, S>(mut self, values: I) -> Self
where
I: IntoIterator<Item = S>,
S: Into<String>,
{
self.tags.extend(values.into_iter().map(Into::into));
self
}
pub fn metadata(mut self, value: Value) -> Self {
self.metadata = value;
self
}
pub fn metadata_field(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
let serialized = serde_json::to_value(value).unwrap_or_else(|error| {
Value::String(format!("failed to serialize metadata value: {error}"))
});
match &mut self.metadata {
Value::Object(object) => {
object.insert(key.into(), serialized);
}
existing => {
let mut object = Map::new();
object.insert("value".to_string(), existing.clone());
object.insert(key.into(), serialized);
self.metadata = Value::Object(object);
}
}
self
}
pub fn ttl_seconds(mut self, value: u64) -> Self {
self.ttl_seconds = Some(value);
self
}
pub fn expires_at(mut self, value: DateTime<Utc>) -> Self {
self.expires_at = Some(value);
self
}
pub fn created_at(mut self, value: DateTime<Utc>) -> Self {
self.created_at = Some(value);
self
}
fn into_payload(self) -> LogPayload {
let body = self.body;
let title = default_title(self.title.as_deref(), &body);
LogPayload {
severity: self.severity,
tags: normalize_tags(self.tags),
title,
body,
created_at: self.created_at,
expires_at: self.expires_at,
ttl_seconds: self.ttl_seconds,
metadata: merge_metadata(self.metadata),
}
}
}
#[derive(Clone, Debug)]
pub struct MasterLogClient {
inner: Arc<ClientInner>,
}
#[derive(Debug)]
struct ClientInner {
config: MasterLogConfig,
sender: Mutex<Option<SyncSender<WorkerMessage>>>,
handle: Mutex<Option<thread::JoinHandle<()>>>,
send_seconds_per_log: Arc<Mutex<f64>>,
}
impl Drop for ClientInner {
fn drop(&mut self) {
if let Ok(sender_slot) = self.sender.get_mut() {
if let Some(sender) = sender_slot.take() {
let (ack_sender, ack_receiver) = mpsc::channel();
let timeout = duration_from_seconds(DEFAULT_SHUTDOWN_TIMEOUT_SECONDS);
if send_command_with_timeout(
&sender,
WorkerMessage::Shutdown(ack_sender),
timeout,
false,
)
.is_ok()
{
let _ = ack_receiver.recv_timeout(timeout);
}
}
}
if let Ok(handle_slot) = self.handle.get_mut() {
if let Some(handle) = handle_slot.take() {
let _ = handle.join();
}
}
}
}
impl MasterLogClient {
pub fn new(config: MasterLogConfig) -> Self {
Self {
inner: Arc::new(ClientInner {
send_seconds_per_log: Arc::new(Mutex::new(config.initial_send_seconds_per_log)),
config,
sender: Mutex::new(None),
handle: Mutex::new(None),
}),
}
}
pub fn from_env() -> Self {
Self::new(MasterLogConfig::from_env())
}
pub fn log(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body))
}
pub fn try_log(&self, body: impl Into<String>) -> Result<MasterLogResult, MasterLogError> {
self.try_log_entry(LogEntry::new(body))
}
pub fn log_entry(&self, entry: LogEntry) -> MasterLogResult {
self.try_log_entry(entry).unwrap_or_else(Into::into)
}
pub fn try_log_entry(&self, entry: LogEntry) -> Result<MasterLogResult, MasterLogError> {
let payload = entry.into_payload();
if self.inner.config.echo {
println!("{}", payload.body);
}
if self.inner.config.async_mode {
self.enqueue(payload)
} else {
self.send(payload)
}
}
pub fn trace(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body).severity(Severity::Trace))
}
pub fn debug(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body).severity(Severity::Debug))
}
pub fn info(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body).severity(Severity::Info))
}
pub fn warn(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body).severity(Severity::Warn))
}
pub fn error(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body).severity(Severity::Error))
}
pub fn fatal(&self, body: impl Into<String>) -> MasterLogResult {
self.log_entry(LogEntry::new(body).severity(Severity::Fatal))
}
pub fn flush(&self, timeout: Duration) -> MasterLogResult {
self.try_flush(timeout).unwrap_or_else(Into::into)
}
pub fn try_flush(&self, timeout: Duration) -> Result<MasterLogResult, MasterLogError> {
if !self.inner.config.async_mode {
return Ok(MasterLogResult {
accepted: Some(0),
..MasterLogResult::ok()
});
}
self.require_config()?;
let sender = self.ensure_worker()?;
let (ack_sender, ack_receiver) = mpsc::channel();
send_command_with_timeout(&sender, WorkerMessage::Flush(ack_sender), timeout, false)?;
let result = ack_receiver
.recv_timeout(timeout)
.map_err(|error| match error {
mpsc::RecvTimeoutError::Timeout => MasterLogError::Timeout(
"timed out waiting for Master Log worker flush".to_string(),
),
mpsc::RecvTimeoutError::Disconnected => MasterLogError::WorkerUnavailable,
})?;
Ok(result.into_result())
}
pub fn shutdown(&self, timeout: Duration) -> MasterLogResult {
self.try_shutdown(timeout).unwrap_or_else(Into::into)
}
pub fn try_shutdown(&self, timeout: Duration) -> Result<MasterLogResult, MasterLogError> {
if !self.inner.config.async_mode {
return Ok(MasterLogResult {
accepted: Some(0),
..MasterLogResult::ok()
});
}
let sender = {
let sender_guard = self
.inner
.sender
.lock()
.map_err(|_| MasterLogError::WorkerUnavailable)?;
sender_guard.clone()
};
let Some(sender) = sender else {
return Ok(MasterLogResult {
accepted: Some(0),
..MasterLogResult::ok()
});
};
let (ack_sender, ack_receiver) = mpsc::channel();
send_command_with_timeout(&sender, WorkerMessage::Shutdown(ack_sender), timeout, false)?;
let result = ack_receiver
.recv_timeout(timeout)
.map_err(|error| match error {
mpsc::RecvTimeoutError::Timeout => MasterLogError::Timeout(
"timed out waiting for Master Log worker shutdown".to_string(),
),
mpsc::RecvTimeoutError::Disconnected => MasterLogError::WorkerUnavailable,
})?;
if let Ok(mut sender_guard) = self.inner.sender.lock() {
sender_guard.take();
}
if let Ok(mut handle_guard) = self.inner.handle.lock() {
if let Some(handle) = handle_guard.take() {
let _ = handle.join();
}
}
Ok(result.into_result())
}
fn enqueue(&self, payload: LogPayload) -> Result<MasterLogResult, MasterLogError> {
self.require_config()?;
let sender = self.ensure_worker()?;
let command = WorkerMessage::Log(payload);
if self.inner.config.drop_when_full {
sender.try_send(command).map_err(|error| match error {
TrySendError::Full(_) => MasterLogError::QueueFull,
TrySendError::Disconnected(_) => MasterLogError::WorkerUnavailable,
})?;
} else {
send_command_with_timeout(
&sender,
command,
self.inner.config.queue_timeout,
self.inner.config.drop_when_full,
)?;
}
self.apply_enqueue_backpressure();
Ok(MasterLogResult::queued())
}
fn send(&self, payload: LogPayload) -> Result<MasterLogResult, MasterLogError> {
self.require_config()?;
let endpoint = self
.inner
.config
.endpoint
.as_deref()
.ok_or(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV))?;
let api_key = self
.inner
.config
.api_key
.as_deref()
.ok_or(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV))?;
let url = logs_url(endpoint)?;
let http = build_http_client(self.inner.config.timeout)?;
post_json(&http, &url, api_key, &payload)
}
fn ensure_worker(&self) -> Result<SyncSender<WorkerMessage>, MasterLogError> {
self.require_config()?;
let mut sender_guard = self
.inner
.sender
.lock()
.map_err(|_| MasterLogError::WorkerUnavailable)?;
let mut handle_guard = self
.inner
.handle
.lock()
.map_err(|_| MasterLogError::WorkerUnavailable)?;
if handle_guard
.as_ref()
.is_some_and(|handle| handle.is_finished())
{
sender_guard.take();
if let Some(handle) = handle_guard.take() {
let _ = handle.join();
}
}
if let Some(sender) = sender_guard.as_ref() {
return Ok(sender.clone());
}
let (sender, receiver) = mpsc::sync_channel(self.inner.config.max_queue_size);
let config = WorkerConfig::from_client_config(&self.inner.config)?;
let send_seconds_per_log = Arc::clone(&self.inner.send_seconds_per_log);
let handle = thread::Builder::new()
.name("master-log-client".to_string())
.spawn(move || batch_worker(config, receiver, send_seconds_per_log))
.map_err(|error| {
MasterLogError::Http(format!("failed to start Master Log worker: {error}"))
})?;
*sender_guard = Some(sender.clone());
*handle_guard = Some(handle);
Ok(sender)
}
fn require_config(&self) -> Result<(), MasterLogError> {
if self
.inner
.config
.api_key
.as_deref()
.unwrap_or("")
.is_empty()
{
return Err(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV));
}
if self
.inner
.config
.endpoint
.as_deref()
.unwrap_or("")
.is_empty()
{
return Err(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV));
}
Ok(())
}
fn apply_enqueue_backpressure(&self) {
if !self.inner.config.backpressure {
return;
}
let sleep_seconds = self
.inner
.send_seconds_per_log
.lock()
.map(|value| *value)
.unwrap_or(self.inner.config.initial_send_seconds_per_log)
.max(0.0);
let sleep = duration_from_seconds(sleep_seconds).min(self.inner.config.max_enqueue_sleep);
if !sleep.is_zero() {
thread::sleep(sleep);
}
}
}
static DEFAULT_CLIENT: OnceLock<Mutex<Option<MasterLogClient>>> = OnceLock::new();
pub fn get_default_client() -> MasterLogClient {
let lock = DEFAULT_CLIENT.get_or_init(|| Mutex::new(None));
let mut guard = lock
.lock()
.expect("default Master Log client lock poisoned");
if let Some(client) = guard.as_ref() {
return client.clone();
}
let client = MasterLogClient::from_env();
*guard = Some(client.clone());
client
}
pub fn configure(config: MasterLogConfig) -> MasterLogResult {
configure_default_client(config)
}
pub fn configure_default_client(config: MasterLogConfig) -> MasterLogResult {
let lock = DEFAULT_CLIENT.get_or_init(|| Mutex::new(None));
let mut guard = lock
.lock()
.expect("default Master Log client lock poisoned");
if let Some(old_client) = guard.take() {
let _ = old_client.shutdown(duration_from_seconds(1.0));
}
*guard = Some(MasterLogClient::new(config));
MasterLogResult::ok()
}
pub fn log(body: impl Into<String>) -> MasterLogResult {
get_default_client().log(body)
}
pub fn mlog(body: impl Into<String>) -> MasterLogResult {
log(body)
}
pub fn log_entry(entry: LogEntry) -> MasterLogResult {
get_default_client().log_entry(entry)
}
pub fn trace(body: impl Into<String>) -> MasterLogResult {
get_default_client().trace(body)
}
pub fn debug(body: impl Into<String>) -> MasterLogResult {
get_default_client().debug(body)
}
pub fn info(body: impl Into<String>) -> MasterLogResult {
get_default_client().info(body)
}
pub fn warn(body: impl Into<String>) -> MasterLogResult {
get_default_client().warn(body)
}
pub fn error(body: impl Into<String>) -> MasterLogResult {
get_default_client().error(body)
}
pub fn fatal(body: impl Into<String>) -> MasterLogResult {
get_default_client().fatal(body)
}
pub fn flush(timeout: Duration) -> MasterLogResult {
get_default_client().flush(timeout)
}
pub fn shutdown(timeout: Duration) -> MasterLogResult {
get_default_client().shutdown(timeout)
}
#[macro_export]
macro_rules! mlogf {
($($arg:tt)*) => {
$crate::mlog(format!($($arg)*))
};
}
#[macro_export]
macro_rules! tracef {
($($arg:tt)*) => {
$crate::trace(format!($($arg)*))
};
}
#[macro_export]
macro_rules! debugf {
($($arg:tt)*) => {
$crate::debug(format!($($arg)*))
};
}
#[macro_export]
macro_rules! infof {
($($arg:tt)*) => {
$crate::info(format!($($arg)*))
};
}
#[macro_export]
macro_rules! warnf {
($($arg:tt)*) => {
$crate::warn(format!($($arg)*))
};
}
#[macro_export]
macro_rules! errorf {
($($arg:tt)*) => {
$crate::error(format!($($arg)*))
};
}
#[macro_export]
macro_rules! fatalf {
($($arg:tt)*) => {
$crate::fatal(format!($($arg)*))
};
}
pub fn logs_url(endpoint: &str) -> Result<String, MasterLogError> {
let endpoint = endpoint.trim().trim_end_matches('/');
if endpoint.is_empty() {
return Err(MasterLogError::EmptyEndpoint);
}
if endpoint.ends_with("/api/v1/logs/batch") {
Ok(endpoint.trim_end_matches("/batch").to_string())
} else if endpoint.ends_with("/api/v1/logs") {
Ok(endpoint.to_string())
} else if endpoint.ends_with("/api/v1") {
Ok(format!("{endpoint}/logs"))
} else {
Ok(format!("{endpoint}/api/v1/logs"))
}
}
pub fn batch_logs_url(endpoint: &str) -> Result<String, MasterLogError> {
let endpoint = endpoint.trim().trim_end_matches('/');
if endpoint.is_empty() {
return Err(MasterLogError::EmptyEndpoint);
}
if endpoint.ends_with("/api/v1/logs/batch") {
Ok(endpoint.to_string())
} else if endpoint.ends_with("/api/v1/logs") {
Ok(format!("{endpoint}/batch"))
} else if endpoint.ends_with("/api/v1") {
Ok(format!("{endpoint}/logs/batch"))
} else {
Ok(format!("{endpoint}/api/v1/logs/batch"))
}
}
#[derive(Clone, Debug, Serialize)]
struct LogPayload {
severity: Severity,
tags: Vec<String>,
title: String,
body: String,
#[serde(skip_serializing_if = "Option::is_none")]
created_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
expires_at: Option<DateTime<Utc>>,
#[serde(skip_serializing_if = "Option::is_none")]
ttl_seconds: Option<u64>,
metadata: Value,
}
#[derive(Clone, Debug)]
struct WorkerConfig {
api_key: String,
endpoint: String,
timeout: Duration,
batch_size: usize,
flush_interval: Duration,
min_request_interval: Duration,
}
impl WorkerConfig {
fn from_client_config(config: &MasterLogConfig) -> Result<Self, MasterLogError> {
Ok(Self {
api_key: config
.api_key
.clone()
.ok_or(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV))?,
endpoint: config
.endpoint
.clone()
.ok_or(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV))?,
timeout: config.timeout,
batch_size: config.batch_size.max(1),
flush_interval: config.flush_interval,
min_request_interval: config.min_request_interval,
})
}
}
enum WorkerMessage {
Log(LogPayload),
Flush(mpsc::Sender<WorkerResult>),
Shutdown(mpsc::Sender<WorkerResult>),
}
#[derive(Clone, Debug)]
struct WorkerResult {
ok: bool,
accepted: usize,
status_code: Option<u16>,
response: Option<Value>,
error: Option<String>,
}
impl WorkerResult {
fn ok(accepted: usize, status_code: Option<u16>, response: Option<Value>) -> Self {
Self {
ok: true,
accepted,
status_code,
response,
error: None,
}
}
fn failed(
accepted: usize,
status_code: Option<u16>,
response: Option<Value>,
error: impl Into<String>,
) -> Self {
Self {
ok: false,
accepted,
status_code,
response,
error: Some(error.into()),
}
}
fn into_result(self) -> MasterLogResult {
MasterLogResult {
ok: self.ok,
status_code: self.status_code,
event_id: None,
error: self.error,
response: self.response,
queued: false,
accepted: Some(self.accepted),
}
}
}
fn batch_worker(
config: WorkerConfig,
receiver: Receiver<WorkerMessage>,
send_seconds_per_log: Arc<Mutex<f64>>,
) {
let http = match build_http_client(config.timeout) {
Ok(client) => client,
Err(error) => {
fail_all_worker_messages(
receiver,
WorkerResult::failed(0, None, None, error.to_string()),
);
return;
}
};
let mut pending = Vec::<LogPayload>::new();
let mut first_pending_at: Option<Instant> = None;
let mut last_request_started_at: Option<Instant> = None;
let mut last_result = WorkerResult::ok(0, None, None);
loop {
let timeout = worker_wait_timeout(first_pending_at, config.flush_interval);
let mut flush_acks = Vec::new();
let mut shutdown_ack = None;
let mut should_shutdown = false;
let mut due = false;
match receive_worker_message(&receiver, timeout) {
Ok(Some(message)) => handle_worker_message(
message,
&mut pending,
&mut first_pending_at,
&mut flush_acks,
&mut shutdown_ack,
&mut should_shutdown,
),
Ok(None) => {
due = !pending.is_empty();
}
Err(()) => {
should_shutdown = true;
}
}
drain_worker_messages(
&receiver,
&mut pending,
&mut first_pending_at,
&mut flush_acks,
&mut shutdown_ack,
&mut should_shutdown,
);
let should_send = !pending.is_empty()
&& (should_shutdown
|| !flush_acks.is_empty()
|| shutdown_ack.is_some()
|| pending.len() >= config.batch_size
|| due);
if should_send {
last_result = send_pending(
&config,
&http,
&mut pending,
&send_seconds_per_log,
&mut last_request_started_at,
);
first_pending_at = if pending.is_empty() {
None
} else {
Some(Instant::now())
};
} else if pending.is_empty() {
first_pending_at = None;
last_result = WorkerResult::ok(0, None, None);
}
for ack in flush_acks {
let _ = ack.send(last_result.clone());
}
if let Some(ack) = shutdown_ack {
let _ = ack.send(last_result.clone());
break;
}
if should_shutdown {
break;
}
}
}
fn receive_worker_message(
receiver: &Receiver<WorkerMessage>,
timeout: Duration,
) -> Result<Option<WorkerMessage>, ()> {
match receiver.recv_timeout(timeout) {
Ok(message) => Ok(Some(message)),
Err(RecvTimeoutError::Timeout) => Ok(None),
Err(RecvTimeoutError::Disconnected) => Err(()),
}
}
fn drain_worker_messages(
receiver: &Receiver<WorkerMessage>,
pending: &mut Vec<LogPayload>,
first_pending_at: &mut Option<Instant>,
flush_acks: &mut Vec<mpsc::Sender<WorkerResult>>,
shutdown_ack: &mut Option<mpsc::Sender<WorkerResult>>,
should_shutdown: &mut bool,
) {
loop {
match receiver.try_recv() {
Ok(message) => handle_worker_message(
message,
pending,
first_pending_at,
flush_acks,
shutdown_ack,
should_shutdown,
),
Err(TryRecvError::Empty) => break,
Err(TryRecvError::Disconnected) => {
*should_shutdown = true;
break;
}
}
}
}
fn handle_worker_message(
message: WorkerMessage,
pending: &mut Vec<LogPayload>,
first_pending_at: &mut Option<Instant>,
flush_acks: &mut Vec<mpsc::Sender<WorkerResult>>,
shutdown_ack: &mut Option<mpsc::Sender<WorkerResult>>,
should_shutdown: &mut bool,
) {
match message {
WorkerMessage::Log(payload) => {
if pending.is_empty() {
*first_pending_at = Some(Instant::now());
}
pending.push(payload);
}
WorkerMessage::Flush(ack) => flush_acks.push(ack),
WorkerMessage::Shutdown(ack) => {
*shutdown_ack = Some(ack);
*should_shutdown = true;
}
}
}
fn send_pending(
config: &WorkerConfig,
http: &HttpClient,
pending: &mut Vec<LogPayload>,
send_seconds_per_log: &Arc<Mutex<f64>>,
last_request_started_at: &mut Option<Instant>,
) -> WorkerResult {
let mut accepted_total = 0usize;
let mut status_code = None;
let mut response = None;
while !pending.is_empty() {
let batch_len = config.batch_size.min(pending.len()).max(1);
wait_for_request_interval(config.min_request_interval, *last_request_started_at);
let started_at = Instant::now();
*last_request_started_at = Some(started_at);
let result = send_batch(config, http, &pending[..batch_len]);
let elapsed = started_at.elapsed();
status_code = result.status_code;
response = result.response.clone();
if !result.ok {
return WorkerResult::failed(
accepted_total,
status_code,
response,
result
.error
.unwrap_or_else(|| "Master Log batch send failed".to_string()),
);
}
let accepted = result.accepted;
update_send_seconds_per_log(send_seconds_per_log, elapsed, accepted);
accepted_total += accepted;
pending.drain(..batch_len);
}
WorkerResult::ok(accepted_total, status_code, response)
}
#[derive(Serialize)]
struct BatchRequest<'a> {
logs: &'a [LogPayload],
}
fn send_batch(config: &WorkerConfig, http: &HttpClient, payloads: &[LogPayload]) -> WorkerResult {
let url = match batch_logs_url(&config.endpoint) {
Ok(url) => url,
Err(error) => return WorkerResult::failed(0, None, None, error.to_string()),
};
match post_json(
http,
&url,
&config.api_key,
&BatchRequest { logs: payloads },
) {
Ok(result) => WorkerResult::ok(
result.accepted.unwrap_or(payloads.len()),
result.status_code,
result.response,
),
Err(error) => WorkerResult::failed(0, None, None, error.to_string()),
}
}
fn fail_all_worker_messages(receiver: Receiver<WorkerMessage>, result: WorkerResult) {
while let Ok(message) = receiver.recv_timeout(duration_from_seconds(0.05)) {
match message {
WorkerMessage::Flush(ack) | WorkerMessage::Shutdown(ack) => {
let _ = ack.send(result.clone());
}
WorkerMessage::Log(_) => {}
}
}
}
fn worker_wait_timeout(first_pending_at: Option<Instant>, flush_interval: Duration) -> Duration {
let Some(first_pending_at) = first_pending_at else {
return flush_interval;
};
flush_interval
.checked_sub(first_pending_at.elapsed())
.unwrap_or(Duration::ZERO)
}
fn wait_for_request_interval(
min_request_interval: Duration,
last_request_started_at: Option<Instant>,
) {
let sleep = request_interval_sleep(
min_request_interval,
last_request_started_at,
Instant::now(),
);
if !sleep.is_zero() {
thread::sleep(sleep);
}
}
fn request_interval_sleep(
min_request_interval: Duration,
last_request_started_at: Option<Instant>,
now: Instant,
) -> Duration {
if min_request_interval.is_zero() {
return Duration::ZERO;
}
let Some(last_request_started_at) = last_request_started_at else {
return Duration::ZERO;
};
min_request_interval
.checked_sub(now.saturating_duration_since(last_request_started_at))
.unwrap_or(Duration::ZERO)
}
fn update_send_seconds_per_log(
send_seconds_per_log: &Arc<Mutex<f64>>,
elapsed: Duration,
accepted_count: usize,
) {
if accepted_count == 0 {
return;
}
let sample = elapsed.as_secs_f64().max(0.0) / accepted_count as f64;
if let Ok(mut current) = send_seconds_per_log.lock() {
if *current <= 0.0 {
*current = sample;
} else {
*current = *current * (1.0 - DEFAULT_SEND_RATE_SMOOTHING)
+ sample * DEFAULT_SEND_RATE_SMOOTHING;
}
}
}
fn send_command_with_timeout(
sender: &SyncSender<WorkerMessage>,
mut command: WorkerMessage,
timeout: Duration,
drop_when_full: bool,
) -> Result<(), MasterLogError> {
let deadline = Instant::now() + timeout;
loop {
match sender.try_send(command) {
Ok(()) => return Ok(()),
Err(TrySendError::Disconnected(_)) => return Err(MasterLogError::WorkerUnavailable),
Err(TrySendError::Full(returned)) => {
if drop_when_full || Instant::now() >= deadline {
return Err(MasterLogError::QueueFull);
}
command = returned;
thread::sleep(duration_from_seconds(0.005).min(timeout));
}
}
}
}
fn build_http_client(timeout: Duration) -> Result<HttpClient, MasterLogError> {
HttpClient::builder()
.timeout(timeout)
.build()
.map_err(|error| MasterLogError::Http(error.to_string()))
}
fn post_json<T: Serialize + ?Sized>(
http: &HttpClient,
url: &str,
api_key: &str,
payload: &T,
) -> Result<MasterLogResult, MasterLogError> {
let response = http
.post(url)
.bearer_auth(api_key)
.header("Accept", "application/json")
.header(
"User-Agent",
format!("master-log-client-rust/{LIBRARY_VERSION}"),
)
.json(payload)
.send()
.map_err(|error| MasterLogError::Http(error.to_string()))?;
let status_code = response.status().as_u16();
let success = response.status().is_success();
let body = response
.text()
.map_err(|error| MasterLogError::Http(error.to_string()))?;
let parsed = if body.trim().is_empty() {
Value::Object(Map::new())
} else {
serde_json::from_str::<Value>(&body).unwrap_or_else(|_| json!({ "error": body }))
};
if !success {
return Err(MasterLogError::Http(format!(
"Master Log returned HTTP {status_code}: {parsed}"
)));
}
let event_id = parsed
.get("events")
.and_then(Value::as_array)
.and_then(|events| events.first())
.and_then(|event| event.get("id"))
.and_then(Value::as_str)
.map(str::to_string);
let accepted = parsed
.get("accepted")
.and_then(Value::as_u64)
.and_then(|value| usize::try_from(value).ok());
Ok(MasterLogResult {
ok: true,
status_code: Some(status_code),
event_id,
error: None,
response: Some(parsed),
queued: false,
accepted,
})
}
fn default_title(title: Option<&str>, body: &str) -> String {
if let Some(title) = title.map(str::trim).filter(|title| !title.is_empty()) {
return title.to_string();
}
let first_line = body
.trim()
.lines()
.next()
.filter(|line| !line.is_empty())
.unwrap_or("Rust log event");
first_line.chars().take(120).collect()
}
fn normalize_tags(tags: Vec<String>) -> Vec<String> {
tags.into_iter()
.flat_map(|tag| {
tag.split(',')
.map(str::trim)
.filter(|tag| !tag.is_empty())
.map(str::to_string)
.collect::<Vec<_>>()
})
.collect::<BTreeSet<_>>()
.into_iter()
.collect()
}
fn merge_metadata(metadata: Value) -> Value {
let mut object = match metadata {
Value::Object(object) => object,
value => {
let mut object = Map::new();
object.insert("value".to_string(), value);
object
}
};
let client_metadata = rust_client_metadata();
if object.contains_key("rust_client") {
object.insert("_rust_client".to_string(), client_metadata);
} else {
object.insert("rust_client".to_string(), client_metadata);
}
Value::Object(object)
}
fn rust_client_metadata() -> Value {
let argv0 = env::args().next().unwrap_or_default();
let process_name = process_name(&argv0);
let hostname = hostname::get()
.ok()
.and_then(|value| value.into_string().ok())
.unwrap_or_else(|| "unknown".to_string());
let cwd = env::current_dir()
.unwrap_or_else(|_| PathBuf::from(""))
.display()
.to_string();
json!({
"hostname": hostname,
"pid": std::process::id(),
"process_name": process_name,
"argv0": argv0,
"cwd": cwd,
"library": "master-log-client",
"library_version": LIBRARY_VERSION,
})
}
fn process_name(argv0: &str) -> String {
if !argv0.trim().is_empty() {
return PathBuf::from(argv0)
.file_name()
.and_then(|value| value.to_str())
.unwrap_or("rust")
.to_string();
}
env::current_exe()
.ok()
.and_then(|path| path.file_name().map(|value| value.to_owned()))
.and_then(|value| value.into_string().ok())
.unwrap_or_else(|| "rust".to_string())
}
fn env_non_empty(name: &str) -> Option<String> {
env::var(name)
.ok()
.map(|value| value.trim().to_string())
.filter(|value| !value.is_empty())
}
fn env_bool(name: &str) -> Option<bool> {
match env::var(name).ok()?.trim().to_ascii_lowercase().as_str() {
"1" | "true" | "yes" | "on" => Some(true),
"0" | "false" | "no" | "off" => Some(false),
_ => None,
}
}
fn env_f64(name: &str) -> Option<f64> {
env::var(name).ok()?.trim().parse::<f64>().ok()
}
fn env_usize(name: &str) -> Option<usize> {
env::var(name).ok()?.trim().parse::<usize>().ok()
}
fn env_duration(name: &str) -> Option<Duration> {
env_f64(name).map(duration_from_seconds)
}
fn positive_usize(value: Option<usize>, default: usize) -> usize {
value.filter(|value| *value > 0).unwrap_or(default)
}
fn non_negative_usize(value: Option<usize>, default: usize) -> usize {
value.unwrap_or(default)
}
fn positive_duration(value: Option<Duration>, default: Duration) -> Duration {
value.filter(|value| !value.is_zero()).unwrap_or(default)
}
fn non_negative_duration(value: Option<Duration>, default: Duration) -> Duration {
value.unwrap_or(default)
}
fn non_negative_f64(value: Option<f64>, default: f64) -> f64 {
value.filter(|value| *value >= 0.0).unwrap_or(default)
}
fn duration_from_seconds(seconds: f64) -> Duration {
Duration::from_secs_f64(seconds.max(0.0))
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn logs_url_accepts_root_api_and_logs_endpoint() {
assert_eq!(
logs_url("http://localhost:8000").unwrap(),
"http://localhost:8000/api/v1/logs"
);
assert_eq!(
logs_url("http://localhost:8000/api/v1").unwrap(),
"http://localhost:8000/api/v1/logs"
);
assert_eq!(
logs_url("http://localhost:8000/api/v1/logs").unwrap(),
"http://localhost:8000/api/v1/logs"
);
assert_eq!(
logs_url("http://localhost:8000/api/v1/logs/batch").unwrap(),
"http://localhost:8000/api/v1/logs"
);
}
#[test]
fn batch_logs_url_accepts_root_api_logs_and_batch_endpoint() {
assert_eq!(
batch_logs_url("http://localhost:8000").unwrap(),
"http://localhost:8000/api/v1/logs/batch"
);
assert_eq!(
batch_logs_url("http://localhost:8000/api/v1").unwrap(),
"http://localhost:8000/api/v1/logs/batch"
);
assert_eq!(
batch_logs_url("http://localhost:8000/api/v1/logs").unwrap(),
"http://localhost:8000/api/v1/logs/batch"
);
assert_eq!(
batch_logs_url("http://localhost:8000/api/v1/logs/batch").unwrap(),
"http://localhost:8000/api/v1/logs/batch"
);
}
#[test]
fn log_payload_is_enriched_and_normalizes_tags() {
let payload = LogEntry::new("hello M31")
.severity(Severity::Warn)
.tags(["ccd, telescope", "ccd"])
.metadata(json!({ "frame": 42 }))
.into_payload();
assert_eq!(payload.severity, Severity::Warn);
assert_eq!(payload.body, "hello M31");
assert_eq!(payload.title, "hello M31");
assert_eq!(payload.tags, vec!["ccd", "telescope"]);
assert_eq!(payload.metadata["frame"], 42);
assert!(payload.metadata.get("rust_client").is_some());
}
#[test]
fn request_interval_sleep_uses_last_request_start() {
let now = Instant::now();
assert_eq!(
request_interval_sleep(Duration::from_millis(250), None, now),
Duration::ZERO
);
assert_eq!(
request_interval_sleep(Duration::ZERO, Some(now), now),
Duration::ZERO
);
assert_eq!(
request_interval_sleep(
Duration::from_millis(250),
Some(now - Duration::from_millis(100)),
now
),
Duration::from_millis(150)
);
assert_eq!(
request_interval_sleep(
Duration::from_millis(250),
Some(now - Duration::from_millis(1000)),
now
),
Duration::ZERO
);
}
}