Skip to main content

master_log_client/
lib.rs

1use std::{
2    collections::BTreeSet,
3    env,
4    error::Error,
5    fmt,
6    path::PathBuf,
7    str::FromStr,
8    sync::{
9        mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TryRecvError, TrySendError},
10        Arc, Mutex, OnceLock,
11    },
12    thread,
13    time::{Duration, Instant},
14};
15
16use chrono::{DateTime, Utc};
17use reqwest::blocking::Client as HttpClient;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Map, Value};
20
21pub const MASTER_LOG_API_KEY_ENV: &str = "MASTER_LOG_API_KEY";
22pub const MASTER_LOG_ENDPOINT_ENV: &str = "MASTER_LOG_ENDPOINT";
23pub const MASTER_LOG_TIMEOUT_ENV: &str = "MASTER_LOG_TIMEOUT_SECONDS";
24pub const MASTER_LOG_ECHO_ENV: &str = "MASTER_LOG_ECHO";
25pub const MASTER_LOG_ASYNC_ENV: &str = "MASTER_LOG_ASYNC";
26pub const MASTER_LOG_BATCH_SIZE_ENV: &str = "MASTER_LOG_BATCH_SIZE";
27pub const MASTER_LOG_FLUSH_INTERVAL_ENV: &str = "MASTER_LOG_FLUSH_INTERVAL_SECONDS";
28pub const MASTER_LOG_MAX_QUEUE_SIZE_ENV: &str = "MASTER_LOG_MAX_QUEUE_SIZE";
29pub const MASTER_LOG_DROP_WHEN_FULL_ENV: &str = "MASTER_LOG_DROP_WHEN_FULL";
30pub const MASTER_LOG_QUEUE_TIMEOUT_ENV: &str = "MASTER_LOG_QUEUE_TIMEOUT_SECONDS";
31pub const MASTER_LOG_BACKPRESSURE_ENV: &str = "MASTER_LOG_BACKPRESSURE";
32pub const MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG_ENV: &str =
33    "MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG";
34pub const MASTER_LOG_MAX_ENQUEUE_SLEEP_ENV: &str = "MASTER_LOG_MAX_ENQUEUE_SLEEP_SECONDS";
35pub const MASTER_LOG_MIN_REQUEST_INTERVAL_ENV: &str = "MASTER_LOG_MIN_REQUEST_INTERVAL_SECONDS";
36
37pub const DEFAULT_TIMEOUT_SECONDS: f64 = 2.0;
38pub const DEFAULT_ASYNC_MODE: bool = true;
39pub const DEFAULT_BATCH_SIZE: usize = 100;
40pub const DEFAULT_FLUSH_INTERVAL_SECONDS: f64 = 1.0;
41pub const DEFAULT_MAX_QUEUE_SIZE: usize = 10_000;
42pub const DEFAULT_DROP_WHEN_FULL: bool = true;
43pub const DEFAULT_QUEUE_TIMEOUT_SECONDS: f64 = 0.25;
44pub const DEFAULT_BACKPRESSURE: bool = true;
45pub const DEFAULT_INITIAL_SEND_SECONDS_PER_LOG: f64 = 0.005;
46pub const DEFAULT_MAX_ENQUEUE_SLEEP_SECONDS: f64 = 0.25;
47pub const DEFAULT_MIN_REQUEST_INTERVAL_SECONDS: f64 = 0.0;
48pub const DEFAULT_SEND_RATE_SMOOTHING: f64 = 0.2;
49pub const DEFAULT_SHUTDOWN_TIMEOUT_SECONDS: f64 = 5.0;
50pub const LIBRARY_VERSION: &str = env!("CARGO_PKG_VERSION");
51
52#[derive(Clone, Debug)]
53pub struct MasterLogConfig {
54    pub api_key: Option<String>,
55    pub endpoint: Option<String>,
56    pub timeout: Duration,
57    pub echo: bool,
58    pub async_mode: bool,
59    pub batch_size: usize,
60    pub flush_interval: Duration,
61    pub max_queue_size: usize,
62    pub drop_when_full: bool,
63    pub queue_timeout: Duration,
64    pub backpressure: bool,
65    pub initial_send_seconds_per_log: f64,
66    pub max_enqueue_sleep: Duration,
67    pub min_request_interval: Duration,
68}
69
70pub type Config = MasterLogConfig;
71
72impl Default for MasterLogConfig {
73    fn default() -> Self {
74        Self {
75            api_key: None,
76            endpoint: None,
77            timeout: duration_from_seconds(DEFAULT_TIMEOUT_SECONDS),
78            echo: false,
79            async_mode: DEFAULT_ASYNC_MODE,
80            batch_size: DEFAULT_BATCH_SIZE,
81            flush_interval: duration_from_seconds(DEFAULT_FLUSH_INTERVAL_SECONDS),
82            max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
83            drop_when_full: DEFAULT_DROP_WHEN_FULL,
84            queue_timeout: duration_from_seconds(DEFAULT_QUEUE_TIMEOUT_SECONDS),
85            backpressure: DEFAULT_BACKPRESSURE,
86            initial_send_seconds_per_log: DEFAULT_INITIAL_SEND_SECONDS_PER_LOG,
87            max_enqueue_sleep: duration_from_seconds(DEFAULT_MAX_ENQUEUE_SLEEP_SECONDS),
88            min_request_interval: duration_from_seconds(DEFAULT_MIN_REQUEST_INTERVAL_SECONDS),
89        }
90    }
91}
92
93impl MasterLogConfig {
94    pub fn from_env() -> Self {
95        let mut config = Self::default();
96        config.api_key = env_non_empty(MASTER_LOG_API_KEY_ENV);
97        config.endpoint = env_non_empty(MASTER_LOG_ENDPOINT_ENV);
98        config.timeout = env_duration(MASTER_LOG_TIMEOUT_ENV).unwrap_or(config.timeout);
99        config.echo = env_bool(MASTER_LOG_ECHO_ENV).unwrap_or(config.echo);
100        config.async_mode = env_bool(MASTER_LOG_ASYNC_ENV).unwrap_or(config.async_mode);
101        config.batch_size = positive_usize(env_usize(MASTER_LOG_BATCH_SIZE_ENV), config.batch_size);
102        config.flush_interval = positive_duration(
103            env_duration(MASTER_LOG_FLUSH_INTERVAL_ENV),
104            config.flush_interval,
105        );
106        config.max_queue_size = non_negative_usize(
107            env_usize(MASTER_LOG_MAX_QUEUE_SIZE_ENV),
108            config.max_queue_size,
109        );
110        config.drop_when_full =
111            env_bool(MASTER_LOG_DROP_WHEN_FULL_ENV).unwrap_or(config.drop_when_full);
112        config.queue_timeout = non_negative_duration(
113            env_duration(MASTER_LOG_QUEUE_TIMEOUT_ENV),
114            config.queue_timeout,
115        );
116        config.backpressure = env_bool(MASTER_LOG_BACKPRESSURE_ENV).unwrap_or(config.backpressure);
117        config.initial_send_seconds_per_log = non_negative_f64(
118            env_f64(MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG_ENV),
119            config.initial_send_seconds_per_log,
120        );
121        config.max_enqueue_sleep = non_negative_duration(
122            env_duration(MASTER_LOG_MAX_ENQUEUE_SLEEP_ENV),
123            config.max_enqueue_sleep,
124        );
125        config.min_request_interval = non_negative_duration(
126            env_duration(MASTER_LOG_MIN_REQUEST_INTERVAL_ENV),
127            config.min_request_interval,
128        );
129        config
130    }
131
132    pub fn api_key(mut self, value: impl Into<String>) -> Self {
133        self.api_key = Some(value.into());
134        self
135    }
136
137    pub fn endpoint(mut self, value: impl Into<String>) -> Self {
138        self.endpoint = Some(value.into());
139        self
140    }
141
142    pub fn timeout(mut self, value: Duration) -> Self {
143        self.timeout = value;
144        self
145    }
146
147    pub fn echo(mut self, value: bool) -> Self {
148        self.echo = value;
149        self
150    }
151
152    pub fn async_mode(mut self, value: bool) -> Self {
153        self.async_mode = value;
154        self
155    }
156
157    pub fn batch_size(mut self, value: usize) -> Self {
158        self.batch_size = value.max(1);
159        self
160    }
161
162    pub fn flush_interval(mut self, value: Duration) -> Self {
163        self.flush_interval = if value.is_zero() {
164            duration_from_seconds(DEFAULT_FLUSH_INTERVAL_SECONDS)
165        } else {
166            value
167        };
168        self
169    }
170
171    pub fn max_queue_size(mut self, value: usize) -> Self {
172        self.max_queue_size = value;
173        self
174    }
175
176    pub fn drop_when_full(mut self, value: bool) -> Self {
177        self.drop_when_full = value;
178        self
179    }
180
181    pub fn queue_timeout(mut self, value: Duration) -> Self {
182        self.queue_timeout = value;
183        self
184    }
185
186    pub fn backpressure(mut self, value: bool) -> Self {
187        self.backpressure = value;
188        self
189    }
190
191    pub fn initial_send_seconds_per_log(mut self, value: f64) -> Self {
192        self.initial_send_seconds_per_log = value.max(0.0);
193        self
194    }
195
196    pub fn max_enqueue_sleep(mut self, value: Duration) -> Self {
197        self.max_enqueue_sleep = value;
198        self
199    }
200
201    pub fn min_request_interval(mut self, value: Duration) -> Self {
202        self.min_request_interval = value;
203        self
204    }
205}
206
207#[derive(Clone, Debug)]
208pub struct MasterLogResult {
209    pub ok: bool,
210    pub status_code: Option<u16>,
211    pub event_id: Option<String>,
212    pub error: Option<String>,
213    pub response: Option<Value>,
214    pub queued: bool,
215    pub accepted: Option<usize>,
216}
217
218impl MasterLogResult {
219    pub fn ok() -> Self {
220        Self {
221            ok: true,
222            status_code: None,
223            event_id: None,
224            error: None,
225            response: None,
226            queued: false,
227            accepted: None,
228        }
229    }
230
231    pub fn queued() -> Self {
232        Self {
233            queued: true,
234            ..Self::ok()
235        }
236    }
237
238    pub fn failed(error: impl Into<String>) -> Self {
239        Self {
240            ok: false,
241            status_code: None,
242            event_id: None,
243            error: Some(error.into()),
244            response: None,
245            queued: false,
246            accepted: None,
247        }
248    }
249}
250
251#[derive(Debug)]
252pub enum MasterLogError {
253    MissingConfig(&'static str),
254    InvalidSeverity(String),
255    EmptyEndpoint,
256    Http(String),
257    Json(String),
258    QueueFull,
259    WorkerUnavailable,
260    Timeout(String),
261}
262
263impl fmt::Display for MasterLogError {
264    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
265        match self {
266            Self::MissingConfig(name) => write!(formatter, "{name} is required"),
267            Self::InvalidSeverity(value) => write!(
268                formatter,
269                "severity must be one of trace, debug, info, warn, error, fatal; got {value}"
270            ),
271            Self::EmptyEndpoint => write!(formatter, "{MASTER_LOG_ENDPOINT_ENV} is empty"),
272            Self::Http(message) => formatter.write_str(message),
273            Self::Json(message) => formatter.write_str(message),
274            Self::QueueFull => formatter.write_str("Master Log queue is full"),
275            Self::WorkerUnavailable => formatter.write_str("Master Log worker is unavailable"),
276            Self::Timeout(message) => formatter.write_str(message),
277        }
278    }
279}
280
281impl Error for MasterLogError {}
282
283impl From<MasterLogError> for MasterLogResult {
284    fn from(error: MasterLogError) -> Self {
285        Self::failed(error.to_string())
286    }
287}
288
289#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
290#[serde(rename_all = "lowercase")]
291pub enum Severity {
292    Trace,
293    Debug,
294    Info,
295    Warn,
296    Error,
297    Fatal,
298}
299
300impl Severity {
301    pub fn all() -> [Self; 6] {
302        [
303            Self::Trace,
304            Self::Debug,
305            Self::Info,
306            Self::Warn,
307            Self::Error,
308            Self::Fatal,
309        ]
310    }
311}
312
313impl fmt::Display for Severity {
314    fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
315        let value = match self {
316            Self::Trace => "trace",
317            Self::Debug => "debug",
318            Self::Info => "info",
319            Self::Warn => "warn",
320            Self::Error => "error",
321            Self::Fatal => "fatal",
322        };
323        formatter.write_str(value)
324    }
325}
326
327impl FromStr for Severity {
328    type Err = MasterLogError;
329
330    fn from_str(value: &str) -> Result<Self, Self::Err> {
331        match value.trim().to_ascii_lowercase().as_str() {
332            "trace" => Ok(Self::Trace),
333            "debug" => Ok(Self::Debug),
334            "info" => Ok(Self::Info),
335            "warn" | "warning" => Ok(Self::Warn),
336            "error" => Ok(Self::Error),
337            "fatal" => Ok(Self::Fatal),
338            other => Err(MasterLogError::InvalidSeverity(other.to_string())),
339        }
340    }
341}
342
343#[derive(Clone, Debug)]
344pub struct LogEntry {
345    body: String,
346    title: Option<String>,
347    severity: Severity,
348    tags: Vec<String>,
349    metadata: Value,
350    ttl_seconds: Option<u64>,
351    expires_at: Option<DateTime<Utc>>,
352    created_at: Option<DateTime<Utc>>,
353}
354
355impl LogEntry {
356    pub fn new(body: impl Into<String>) -> Self {
357        Self {
358            body: body.into(),
359            title: None,
360            severity: Severity::Info,
361            tags: Vec::new(),
362            metadata: Value::Object(Map::new()),
363            ttl_seconds: None,
364            expires_at: None,
365            created_at: None,
366        }
367    }
368
369    pub fn with_title(title: impl Into<String>, body: impl Into<String>) -> Self {
370        Self::new(body).title(title)
371    }
372
373    pub fn severity(mut self, value: Severity) -> Self {
374        self.severity = value;
375        self
376    }
377
378    pub fn severity_str(mut self, value: impl AsRef<str>) -> Result<Self, MasterLogError> {
379        self.severity = Severity::from_str(value.as_ref())?;
380        Ok(self)
381    }
382
383    pub fn title(mut self, value: impl Into<String>) -> Self {
384        self.title = Some(value.into());
385        self
386    }
387
388    pub fn tag(mut self, value: impl Into<String>) -> Self {
389        self.tags.push(value.into());
390        self
391    }
392
393    pub fn tags<I, S>(mut self, values: I) -> Self
394    where
395        I: IntoIterator<Item = S>,
396        S: Into<String>,
397    {
398        self.tags.extend(values.into_iter().map(Into::into));
399        self
400    }
401
402    pub fn metadata(mut self, value: Value) -> Self {
403        self.metadata = value;
404        self
405    }
406
407    pub fn metadata_field(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
408        let serialized = serde_json::to_value(value).unwrap_or_else(|error| {
409            Value::String(format!("failed to serialize metadata value: {error}"))
410        });
411
412        match &mut self.metadata {
413            Value::Object(object) => {
414                object.insert(key.into(), serialized);
415            }
416            existing => {
417                let mut object = Map::new();
418                object.insert("value".to_string(), existing.clone());
419                object.insert(key.into(), serialized);
420                self.metadata = Value::Object(object);
421            }
422        }
423
424        self
425    }
426
427    pub fn ttl_seconds(mut self, value: u64) -> Self {
428        self.ttl_seconds = Some(value);
429        self
430    }
431
432    pub fn expires_at(mut self, value: DateTime<Utc>) -> Self {
433        self.expires_at = Some(value);
434        self
435    }
436
437    pub fn created_at(mut self, value: DateTime<Utc>) -> Self {
438        self.created_at = Some(value);
439        self
440    }
441
442    fn into_payload(self) -> LogPayload {
443        let body = self.body;
444        let title = default_title(self.title.as_deref(), &body);
445        LogPayload {
446            severity: self.severity,
447            tags: normalize_tags(self.tags),
448            title,
449            body,
450            created_at: self.created_at,
451            expires_at: self.expires_at,
452            ttl_seconds: self.ttl_seconds,
453            metadata: merge_metadata(self.metadata),
454        }
455    }
456}
457
458#[derive(Clone, Debug)]
459pub struct MasterLogClient {
460    inner: Arc<ClientInner>,
461}
462
463#[derive(Debug)]
464struct ClientInner {
465    config: MasterLogConfig,
466    sender: Mutex<Option<SyncSender<WorkerMessage>>>,
467    handle: Mutex<Option<thread::JoinHandle<()>>>,
468    send_seconds_per_log: Arc<Mutex<f64>>,
469}
470
471impl Drop for ClientInner {
472    fn drop(&mut self) {
473        if let Ok(sender_slot) = self.sender.get_mut() {
474            if let Some(sender) = sender_slot.take() {
475                let (ack_sender, ack_receiver) = mpsc::channel();
476                let timeout = duration_from_seconds(DEFAULT_SHUTDOWN_TIMEOUT_SECONDS);
477                if send_command_with_timeout(
478                    &sender,
479                    WorkerMessage::Shutdown(ack_sender),
480                    timeout,
481                    false,
482                )
483                .is_ok()
484                {
485                    let _ = ack_receiver.recv_timeout(timeout);
486                }
487            }
488        }
489
490        if let Ok(handle_slot) = self.handle.get_mut() {
491            if let Some(handle) = handle_slot.take() {
492                let _ = handle.join();
493            }
494        }
495    }
496}
497
498impl MasterLogClient {
499    pub fn new(config: MasterLogConfig) -> Self {
500        Self {
501            inner: Arc::new(ClientInner {
502                send_seconds_per_log: Arc::new(Mutex::new(config.initial_send_seconds_per_log)),
503                config,
504                sender: Mutex::new(None),
505                handle: Mutex::new(None),
506            }),
507        }
508    }
509
510    pub fn from_env() -> Self {
511        Self::new(MasterLogConfig::from_env())
512    }
513
514    pub fn log(&self, body: impl Into<String>) -> MasterLogResult {
515        self.log_entry(LogEntry::new(body))
516    }
517
518    pub fn try_log(&self, body: impl Into<String>) -> Result<MasterLogResult, MasterLogError> {
519        self.try_log_entry(LogEntry::new(body))
520    }
521
522    pub fn log_entry(&self, entry: LogEntry) -> MasterLogResult {
523        self.try_log_entry(entry).unwrap_or_else(Into::into)
524    }
525
526    pub fn try_log_entry(&self, entry: LogEntry) -> Result<MasterLogResult, MasterLogError> {
527        let payload = entry.into_payload();
528        if self.inner.config.echo {
529            println!("{}", payload.body);
530        }
531
532        if self.inner.config.async_mode {
533            self.enqueue(payload)
534        } else {
535            self.send(payload)
536        }
537    }
538
539    pub fn trace(&self, body: impl Into<String>) -> MasterLogResult {
540        self.log_entry(LogEntry::new(body).severity(Severity::Trace))
541    }
542
543    pub fn debug(&self, body: impl Into<String>) -> MasterLogResult {
544        self.log_entry(LogEntry::new(body).severity(Severity::Debug))
545    }
546
547    pub fn info(&self, body: impl Into<String>) -> MasterLogResult {
548        self.log_entry(LogEntry::new(body).severity(Severity::Info))
549    }
550
551    pub fn warn(&self, body: impl Into<String>) -> MasterLogResult {
552        self.log_entry(LogEntry::new(body).severity(Severity::Warn))
553    }
554
555    pub fn error(&self, body: impl Into<String>) -> MasterLogResult {
556        self.log_entry(LogEntry::new(body).severity(Severity::Error))
557    }
558
559    pub fn fatal(&self, body: impl Into<String>) -> MasterLogResult {
560        self.log_entry(LogEntry::new(body).severity(Severity::Fatal))
561    }
562
563    pub fn flush(&self, timeout: Duration) -> MasterLogResult {
564        self.try_flush(timeout).unwrap_or_else(Into::into)
565    }
566
567    pub fn try_flush(&self, timeout: Duration) -> Result<MasterLogResult, MasterLogError> {
568        if !self.inner.config.async_mode {
569            return Ok(MasterLogResult {
570                accepted: Some(0),
571                ..MasterLogResult::ok()
572            });
573        }
574
575        self.require_config()?;
576        let sender = self.ensure_worker()?;
577        let (ack_sender, ack_receiver) = mpsc::channel();
578        send_command_with_timeout(&sender, WorkerMessage::Flush(ack_sender), timeout, false)?;
579
580        let result = ack_receiver
581            .recv_timeout(timeout)
582            .map_err(|error| match error {
583                mpsc::RecvTimeoutError::Timeout => MasterLogError::Timeout(
584                    "timed out waiting for Master Log worker flush".to_string(),
585                ),
586                mpsc::RecvTimeoutError::Disconnected => MasterLogError::WorkerUnavailable,
587            })?;
588
589        Ok(result.into_result())
590    }
591
592    pub fn shutdown(&self, timeout: Duration) -> MasterLogResult {
593        self.try_shutdown(timeout).unwrap_or_else(Into::into)
594    }
595
596    pub fn try_shutdown(&self, timeout: Duration) -> Result<MasterLogResult, MasterLogError> {
597        if !self.inner.config.async_mode {
598            return Ok(MasterLogResult {
599                accepted: Some(0),
600                ..MasterLogResult::ok()
601            });
602        }
603
604        let sender = {
605            let sender_guard = self
606                .inner
607                .sender
608                .lock()
609                .map_err(|_| MasterLogError::WorkerUnavailable)?;
610            sender_guard.clone()
611        };
612
613        let Some(sender) = sender else {
614            return Ok(MasterLogResult {
615                accepted: Some(0),
616                ..MasterLogResult::ok()
617            });
618        };
619
620        let (ack_sender, ack_receiver) = mpsc::channel();
621        send_command_with_timeout(&sender, WorkerMessage::Shutdown(ack_sender), timeout, false)?;
622
623        let result = ack_receiver
624            .recv_timeout(timeout)
625            .map_err(|error| match error {
626                mpsc::RecvTimeoutError::Timeout => MasterLogError::Timeout(
627                    "timed out waiting for Master Log worker shutdown".to_string(),
628                ),
629                mpsc::RecvTimeoutError::Disconnected => MasterLogError::WorkerUnavailable,
630            })?;
631
632        if let Ok(mut sender_guard) = self.inner.sender.lock() {
633            sender_guard.take();
634        }
635
636        if let Ok(mut handle_guard) = self.inner.handle.lock() {
637            if let Some(handle) = handle_guard.take() {
638                let _ = handle.join();
639            }
640        }
641
642        Ok(result.into_result())
643    }
644
645    fn enqueue(&self, payload: LogPayload) -> Result<MasterLogResult, MasterLogError> {
646        self.require_config()?;
647        let sender = self.ensure_worker()?;
648        let command = WorkerMessage::Log(payload);
649
650        if self.inner.config.drop_when_full {
651            sender.try_send(command).map_err(|error| match error {
652                TrySendError::Full(_) => MasterLogError::QueueFull,
653                TrySendError::Disconnected(_) => MasterLogError::WorkerUnavailable,
654            })?;
655        } else {
656            send_command_with_timeout(
657                &sender,
658                command,
659                self.inner.config.queue_timeout,
660                self.inner.config.drop_when_full,
661            )?;
662        }
663
664        self.apply_enqueue_backpressure();
665        Ok(MasterLogResult::queued())
666    }
667
668    fn send(&self, payload: LogPayload) -> Result<MasterLogResult, MasterLogError> {
669        self.require_config()?;
670        let endpoint = self
671            .inner
672            .config
673            .endpoint
674            .as_deref()
675            .ok_or(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV))?;
676        let api_key = self
677            .inner
678            .config
679            .api_key
680            .as_deref()
681            .ok_or(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV))?;
682        let url = logs_url(endpoint)?;
683        let http = build_http_client(self.inner.config.timeout)?;
684        post_json(&http, &url, api_key, &payload)
685    }
686
687    fn ensure_worker(&self) -> Result<SyncSender<WorkerMessage>, MasterLogError> {
688        self.require_config()?;
689
690        let mut sender_guard = self
691            .inner
692            .sender
693            .lock()
694            .map_err(|_| MasterLogError::WorkerUnavailable)?;
695        let mut handle_guard = self
696            .inner
697            .handle
698            .lock()
699            .map_err(|_| MasterLogError::WorkerUnavailable)?;
700
701        if handle_guard
702            .as_ref()
703            .is_some_and(|handle| handle.is_finished())
704        {
705            sender_guard.take();
706            if let Some(handle) = handle_guard.take() {
707                let _ = handle.join();
708            }
709        }
710
711        if let Some(sender) = sender_guard.as_ref() {
712            return Ok(sender.clone());
713        }
714
715        let (sender, receiver) = mpsc::sync_channel(self.inner.config.max_queue_size);
716        let config = WorkerConfig::from_client_config(&self.inner.config)?;
717        let send_seconds_per_log = Arc::clone(&self.inner.send_seconds_per_log);
718
719        let handle = thread::Builder::new()
720            .name("master-log-client".to_string())
721            .spawn(move || batch_worker(config, receiver, send_seconds_per_log))
722            .map_err(|error| {
723                MasterLogError::Http(format!("failed to start Master Log worker: {error}"))
724            })?;
725
726        *sender_guard = Some(sender.clone());
727        *handle_guard = Some(handle);
728        Ok(sender)
729    }
730
731    fn require_config(&self) -> Result<(), MasterLogError> {
732        if self
733            .inner
734            .config
735            .api_key
736            .as_deref()
737            .unwrap_or("")
738            .is_empty()
739        {
740            return Err(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV));
741        }
742        if self
743            .inner
744            .config
745            .endpoint
746            .as_deref()
747            .unwrap_or("")
748            .is_empty()
749        {
750            return Err(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV));
751        }
752        Ok(())
753    }
754
755    fn apply_enqueue_backpressure(&self) {
756        if !self.inner.config.backpressure {
757            return;
758        }
759
760        let sleep_seconds = self
761            .inner
762            .send_seconds_per_log
763            .lock()
764            .map(|value| *value)
765            .unwrap_or(self.inner.config.initial_send_seconds_per_log)
766            .max(0.0);
767        let sleep = duration_from_seconds(sleep_seconds).min(self.inner.config.max_enqueue_sleep);
768
769        if !sleep.is_zero() {
770            thread::sleep(sleep);
771        }
772    }
773}
774
775static DEFAULT_CLIENT: OnceLock<Mutex<Option<MasterLogClient>>> = OnceLock::new();
776
777pub fn get_default_client() -> MasterLogClient {
778    let lock = DEFAULT_CLIENT.get_or_init(|| Mutex::new(None));
779    let mut guard = lock
780        .lock()
781        .expect("default Master Log client lock poisoned");
782    if let Some(client) = guard.as_ref() {
783        return client.clone();
784    }
785
786    let client = MasterLogClient::from_env();
787    *guard = Some(client.clone());
788    client
789}
790
791pub fn configure(config: MasterLogConfig) -> MasterLogResult {
792    configure_default_client(config)
793}
794
795pub fn configure_default_client(config: MasterLogConfig) -> MasterLogResult {
796    let lock = DEFAULT_CLIENT.get_or_init(|| Mutex::new(None));
797    let mut guard = lock
798        .lock()
799        .expect("default Master Log client lock poisoned");
800    if let Some(old_client) = guard.take() {
801        let _ = old_client.shutdown(duration_from_seconds(1.0));
802    }
803
804    *guard = Some(MasterLogClient::new(config));
805    MasterLogResult::ok()
806}
807
808pub fn log(body: impl Into<String>) -> MasterLogResult {
809    get_default_client().log(body)
810}
811
812pub fn mlog(body: impl Into<String>) -> MasterLogResult {
813    log(body)
814}
815
816pub fn log_entry(entry: LogEntry) -> MasterLogResult {
817    get_default_client().log_entry(entry)
818}
819
820pub fn trace(body: impl Into<String>) -> MasterLogResult {
821    get_default_client().trace(body)
822}
823
824pub fn debug(body: impl Into<String>) -> MasterLogResult {
825    get_default_client().debug(body)
826}
827
828pub fn info(body: impl Into<String>) -> MasterLogResult {
829    get_default_client().info(body)
830}
831
832pub fn warn(body: impl Into<String>) -> MasterLogResult {
833    get_default_client().warn(body)
834}
835
836pub fn error(body: impl Into<String>) -> MasterLogResult {
837    get_default_client().error(body)
838}
839
840pub fn fatal(body: impl Into<String>) -> MasterLogResult {
841    get_default_client().fatal(body)
842}
843
844pub fn flush(timeout: Duration) -> MasterLogResult {
845    get_default_client().flush(timeout)
846}
847
848pub fn shutdown(timeout: Duration) -> MasterLogResult {
849    get_default_client().shutdown(timeout)
850}
851
852#[macro_export]
853macro_rules! mlogf {
854    ($($arg:tt)*) => {
855        $crate::mlog(format!($($arg)*))
856    };
857}
858
859#[macro_export]
860macro_rules! tracef {
861    ($($arg:tt)*) => {
862        $crate::trace(format!($($arg)*))
863    };
864}
865
866#[macro_export]
867macro_rules! debugf {
868    ($($arg:tt)*) => {
869        $crate::debug(format!($($arg)*))
870    };
871}
872
873#[macro_export]
874macro_rules! infof {
875    ($($arg:tt)*) => {
876        $crate::info(format!($($arg)*))
877    };
878}
879
880#[macro_export]
881macro_rules! warnf {
882    ($($arg:tt)*) => {
883        $crate::warn(format!($($arg)*))
884    };
885}
886
887#[macro_export]
888macro_rules! errorf {
889    ($($arg:tt)*) => {
890        $crate::error(format!($($arg)*))
891    };
892}
893
894#[macro_export]
895macro_rules! fatalf {
896    ($($arg:tt)*) => {
897        $crate::fatal(format!($($arg)*))
898    };
899}
900
901pub fn logs_url(endpoint: &str) -> Result<String, MasterLogError> {
902    let endpoint = endpoint.trim().trim_end_matches('/');
903    if endpoint.is_empty() {
904        return Err(MasterLogError::EmptyEndpoint);
905    }
906
907    if endpoint.ends_with("/api/v1/logs/batch") {
908        Ok(endpoint.trim_end_matches("/batch").to_string())
909    } else if endpoint.ends_with("/api/v1/logs") {
910        Ok(endpoint.to_string())
911    } else if endpoint.ends_with("/api/v1") {
912        Ok(format!("{endpoint}/logs"))
913    } else {
914        Ok(format!("{endpoint}/api/v1/logs"))
915    }
916}
917
918pub fn batch_logs_url(endpoint: &str) -> Result<String, MasterLogError> {
919    let endpoint = endpoint.trim().trim_end_matches('/');
920    if endpoint.is_empty() {
921        return Err(MasterLogError::EmptyEndpoint);
922    }
923
924    if endpoint.ends_with("/api/v1/logs/batch") {
925        Ok(endpoint.to_string())
926    } else if endpoint.ends_with("/api/v1/logs") {
927        Ok(format!("{endpoint}/batch"))
928    } else if endpoint.ends_with("/api/v1") {
929        Ok(format!("{endpoint}/logs/batch"))
930    } else {
931        Ok(format!("{endpoint}/api/v1/logs/batch"))
932    }
933}
934
935#[derive(Clone, Debug, Serialize)]
936struct LogPayload {
937    severity: Severity,
938    tags: Vec<String>,
939    title: String,
940    body: String,
941    #[serde(skip_serializing_if = "Option::is_none")]
942    created_at: Option<DateTime<Utc>>,
943    #[serde(skip_serializing_if = "Option::is_none")]
944    expires_at: Option<DateTime<Utc>>,
945    #[serde(skip_serializing_if = "Option::is_none")]
946    ttl_seconds: Option<u64>,
947    metadata: Value,
948}
949
950#[derive(Clone, Debug)]
951struct WorkerConfig {
952    api_key: String,
953    endpoint: String,
954    timeout: Duration,
955    batch_size: usize,
956    flush_interval: Duration,
957    min_request_interval: Duration,
958}
959
960impl WorkerConfig {
961    fn from_client_config(config: &MasterLogConfig) -> Result<Self, MasterLogError> {
962        Ok(Self {
963            api_key: config
964                .api_key
965                .clone()
966                .ok_or(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV))?,
967            endpoint: config
968                .endpoint
969                .clone()
970                .ok_or(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV))?,
971            timeout: config.timeout,
972            batch_size: config.batch_size.max(1),
973            flush_interval: config.flush_interval,
974            min_request_interval: config.min_request_interval,
975        })
976    }
977}
978
979enum WorkerMessage {
980    Log(LogPayload),
981    Flush(mpsc::Sender<WorkerResult>),
982    Shutdown(mpsc::Sender<WorkerResult>),
983}
984
985#[derive(Clone, Debug)]
986struct WorkerResult {
987    ok: bool,
988    accepted: usize,
989    status_code: Option<u16>,
990    response: Option<Value>,
991    error: Option<String>,
992}
993
994impl WorkerResult {
995    fn ok(accepted: usize, status_code: Option<u16>, response: Option<Value>) -> Self {
996        Self {
997            ok: true,
998            accepted,
999            status_code,
1000            response,
1001            error: None,
1002        }
1003    }
1004
1005    fn failed(
1006        accepted: usize,
1007        status_code: Option<u16>,
1008        response: Option<Value>,
1009        error: impl Into<String>,
1010    ) -> Self {
1011        Self {
1012            ok: false,
1013            accepted,
1014            status_code,
1015            response,
1016            error: Some(error.into()),
1017        }
1018    }
1019
1020    fn into_result(self) -> MasterLogResult {
1021        MasterLogResult {
1022            ok: self.ok,
1023            status_code: self.status_code,
1024            event_id: None,
1025            error: self.error,
1026            response: self.response,
1027            queued: false,
1028            accepted: Some(self.accepted),
1029        }
1030    }
1031}
1032
1033fn batch_worker(
1034    config: WorkerConfig,
1035    receiver: Receiver<WorkerMessage>,
1036    send_seconds_per_log: Arc<Mutex<f64>>,
1037) {
1038    let http = match build_http_client(config.timeout) {
1039        Ok(client) => client,
1040        Err(error) => {
1041            fail_all_worker_messages(
1042                receiver,
1043                WorkerResult::failed(0, None, None, error.to_string()),
1044            );
1045            return;
1046        }
1047    };
1048
1049    let mut pending = Vec::<LogPayload>::new();
1050    let mut first_pending_at: Option<Instant> = None;
1051    let mut last_request_started_at: Option<Instant> = None;
1052    let mut last_result = WorkerResult::ok(0, None, None);
1053
1054    loop {
1055        let timeout = worker_wait_timeout(first_pending_at, config.flush_interval);
1056        let mut flush_acks = Vec::new();
1057        let mut shutdown_ack = None;
1058        let mut should_shutdown = false;
1059        let mut due = false;
1060
1061        match receive_worker_message(&receiver, timeout) {
1062            Ok(Some(message)) => handle_worker_message(
1063                message,
1064                &mut pending,
1065                &mut first_pending_at,
1066                &mut flush_acks,
1067                &mut shutdown_ack,
1068                &mut should_shutdown,
1069            ),
1070            Ok(None) => {
1071                due = !pending.is_empty();
1072            }
1073            Err(()) => {
1074                should_shutdown = true;
1075            }
1076        }
1077
1078        drain_worker_messages(
1079            &receiver,
1080            &mut pending,
1081            &mut first_pending_at,
1082            &mut flush_acks,
1083            &mut shutdown_ack,
1084            &mut should_shutdown,
1085        );
1086
1087        let should_send = !pending.is_empty()
1088            && (should_shutdown
1089                || !flush_acks.is_empty()
1090                || shutdown_ack.is_some()
1091                || pending.len() >= config.batch_size
1092                || due);
1093
1094        if should_send {
1095            last_result = send_pending(
1096                &config,
1097                &http,
1098                &mut pending,
1099                &send_seconds_per_log,
1100                &mut last_request_started_at,
1101            );
1102            first_pending_at = if pending.is_empty() {
1103                None
1104            } else {
1105                Some(Instant::now())
1106            };
1107        } else if pending.is_empty() {
1108            first_pending_at = None;
1109            last_result = WorkerResult::ok(0, None, None);
1110        }
1111
1112        for ack in flush_acks {
1113            let _ = ack.send(last_result.clone());
1114        }
1115
1116        if let Some(ack) = shutdown_ack {
1117            let _ = ack.send(last_result.clone());
1118            break;
1119        }
1120
1121        if should_shutdown {
1122            break;
1123        }
1124    }
1125}
1126
1127fn receive_worker_message(
1128    receiver: &Receiver<WorkerMessage>,
1129    timeout: Duration,
1130) -> Result<Option<WorkerMessage>, ()> {
1131    match receiver.recv_timeout(timeout) {
1132        Ok(message) => Ok(Some(message)),
1133        Err(RecvTimeoutError::Timeout) => Ok(None),
1134        Err(RecvTimeoutError::Disconnected) => Err(()),
1135    }
1136}
1137
1138fn drain_worker_messages(
1139    receiver: &Receiver<WorkerMessage>,
1140    pending: &mut Vec<LogPayload>,
1141    first_pending_at: &mut Option<Instant>,
1142    flush_acks: &mut Vec<mpsc::Sender<WorkerResult>>,
1143    shutdown_ack: &mut Option<mpsc::Sender<WorkerResult>>,
1144    should_shutdown: &mut bool,
1145) {
1146    loop {
1147        match receiver.try_recv() {
1148            Ok(message) => handle_worker_message(
1149                message,
1150                pending,
1151                first_pending_at,
1152                flush_acks,
1153                shutdown_ack,
1154                should_shutdown,
1155            ),
1156            Err(TryRecvError::Empty) => break,
1157            Err(TryRecvError::Disconnected) => {
1158                *should_shutdown = true;
1159                break;
1160            }
1161        }
1162    }
1163}
1164
1165fn handle_worker_message(
1166    message: WorkerMessage,
1167    pending: &mut Vec<LogPayload>,
1168    first_pending_at: &mut Option<Instant>,
1169    flush_acks: &mut Vec<mpsc::Sender<WorkerResult>>,
1170    shutdown_ack: &mut Option<mpsc::Sender<WorkerResult>>,
1171    should_shutdown: &mut bool,
1172) {
1173    match message {
1174        WorkerMessage::Log(payload) => {
1175            if pending.is_empty() {
1176                *first_pending_at = Some(Instant::now());
1177            }
1178            pending.push(payload);
1179        }
1180        WorkerMessage::Flush(ack) => flush_acks.push(ack),
1181        WorkerMessage::Shutdown(ack) => {
1182            *shutdown_ack = Some(ack);
1183            *should_shutdown = true;
1184        }
1185    }
1186}
1187
1188fn send_pending(
1189    config: &WorkerConfig,
1190    http: &HttpClient,
1191    pending: &mut Vec<LogPayload>,
1192    send_seconds_per_log: &Arc<Mutex<f64>>,
1193    last_request_started_at: &mut Option<Instant>,
1194) -> WorkerResult {
1195    let mut accepted_total = 0usize;
1196    let mut status_code = None;
1197    let mut response = None;
1198
1199    while !pending.is_empty() {
1200        let batch_len = config.batch_size.min(pending.len()).max(1);
1201        wait_for_request_interval(config.min_request_interval, *last_request_started_at);
1202        let started_at = Instant::now();
1203        *last_request_started_at = Some(started_at);
1204
1205        let result = send_batch(config, http, &pending[..batch_len]);
1206        let elapsed = started_at.elapsed();
1207        status_code = result.status_code;
1208        response = result.response.clone();
1209
1210        if !result.ok {
1211            return WorkerResult::failed(
1212                accepted_total,
1213                status_code,
1214                response,
1215                result
1216                    .error
1217                    .unwrap_or_else(|| "Master Log batch send failed".to_string()),
1218            );
1219        }
1220
1221        let accepted = result.accepted;
1222        update_send_seconds_per_log(send_seconds_per_log, elapsed, accepted);
1223        accepted_total += accepted;
1224        pending.drain(..batch_len);
1225    }
1226
1227    WorkerResult::ok(accepted_total, status_code, response)
1228}
1229
1230#[derive(Serialize)]
1231struct BatchRequest<'a> {
1232    logs: &'a [LogPayload],
1233}
1234
1235fn send_batch(config: &WorkerConfig, http: &HttpClient, payloads: &[LogPayload]) -> WorkerResult {
1236    let url = match batch_logs_url(&config.endpoint) {
1237        Ok(url) => url,
1238        Err(error) => return WorkerResult::failed(0, None, None, error.to_string()),
1239    };
1240
1241    match post_json(
1242        http,
1243        &url,
1244        &config.api_key,
1245        &BatchRequest { logs: payloads },
1246    ) {
1247        Ok(result) => WorkerResult::ok(
1248            result.accepted.unwrap_or(payloads.len()),
1249            result.status_code,
1250            result.response,
1251        ),
1252        Err(error) => WorkerResult::failed(0, None, None, error.to_string()),
1253    }
1254}
1255
1256fn fail_all_worker_messages(receiver: Receiver<WorkerMessage>, result: WorkerResult) {
1257    while let Ok(message) = receiver.recv_timeout(duration_from_seconds(0.05)) {
1258        match message {
1259            WorkerMessage::Flush(ack) | WorkerMessage::Shutdown(ack) => {
1260                let _ = ack.send(result.clone());
1261            }
1262            WorkerMessage::Log(_) => {}
1263        }
1264    }
1265}
1266
1267fn worker_wait_timeout(first_pending_at: Option<Instant>, flush_interval: Duration) -> Duration {
1268    let Some(first_pending_at) = first_pending_at else {
1269        return flush_interval;
1270    };
1271
1272    flush_interval
1273        .checked_sub(first_pending_at.elapsed())
1274        .unwrap_or(Duration::ZERO)
1275}
1276
1277fn wait_for_request_interval(
1278    min_request_interval: Duration,
1279    last_request_started_at: Option<Instant>,
1280) {
1281    let sleep = request_interval_sleep(
1282        min_request_interval,
1283        last_request_started_at,
1284        Instant::now(),
1285    );
1286    if !sleep.is_zero() {
1287        thread::sleep(sleep);
1288    }
1289}
1290
1291fn request_interval_sleep(
1292    min_request_interval: Duration,
1293    last_request_started_at: Option<Instant>,
1294    now: Instant,
1295) -> Duration {
1296    if min_request_interval.is_zero() {
1297        return Duration::ZERO;
1298    }
1299
1300    let Some(last_request_started_at) = last_request_started_at else {
1301        return Duration::ZERO;
1302    };
1303
1304    min_request_interval
1305        .checked_sub(now.saturating_duration_since(last_request_started_at))
1306        .unwrap_or(Duration::ZERO)
1307}
1308
1309fn update_send_seconds_per_log(
1310    send_seconds_per_log: &Arc<Mutex<f64>>,
1311    elapsed: Duration,
1312    accepted_count: usize,
1313) {
1314    if accepted_count == 0 {
1315        return;
1316    }
1317
1318    let sample = elapsed.as_secs_f64().max(0.0) / accepted_count as f64;
1319    if let Ok(mut current) = send_seconds_per_log.lock() {
1320        if *current <= 0.0 {
1321            *current = sample;
1322        } else {
1323            *current = *current * (1.0 - DEFAULT_SEND_RATE_SMOOTHING)
1324                + sample * DEFAULT_SEND_RATE_SMOOTHING;
1325        }
1326    }
1327}
1328
1329fn send_command_with_timeout(
1330    sender: &SyncSender<WorkerMessage>,
1331    mut command: WorkerMessage,
1332    timeout: Duration,
1333    drop_when_full: bool,
1334) -> Result<(), MasterLogError> {
1335    let deadline = Instant::now() + timeout;
1336
1337    loop {
1338        match sender.try_send(command) {
1339            Ok(()) => return Ok(()),
1340            Err(TrySendError::Disconnected(_)) => return Err(MasterLogError::WorkerUnavailable),
1341            Err(TrySendError::Full(returned)) => {
1342                if drop_when_full || Instant::now() >= deadline {
1343                    return Err(MasterLogError::QueueFull);
1344                }
1345                command = returned;
1346                thread::sleep(duration_from_seconds(0.005).min(timeout));
1347            }
1348        }
1349    }
1350}
1351
1352fn build_http_client(timeout: Duration) -> Result<HttpClient, MasterLogError> {
1353    HttpClient::builder()
1354        .timeout(timeout)
1355        .build()
1356        .map_err(|error| MasterLogError::Http(error.to_string()))
1357}
1358
1359fn post_json<T: Serialize + ?Sized>(
1360    http: &HttpClient,
1361    url: &str,
1362    api_key: &str,
1363    payload: &T,
1364) -> Result<MasterLogResult, MasterLogError> {
1365    let response = http
1366        .post(url)
1367        .bearer_auth(api_key)
1368        .header("Accept", "application/json")
1369        .header(
1370            "User-Agent",
1371            format!("master-log-client-rust/{LIBRARY_VERSION}"),
1372        )
1373        .json(payload)
1374        .send()
1375        .map_err(|error| MasterLogError::Http(error.to_string()))?;
1376
1377    let status_code = response.status().as_u16();
1378    let success = response.status().is_success();
1379    let body = response
1380        .text()
1381        .map_err(|error| MasterLogError::Http(error.to_string()))?;
1382    let parsed = if body.trim().is_empty() {
1383        Value::Object(Map::new())
1384    } else {
1385        serde_json::from_str::<Value>(&body).unwrap_or_else(|_| json!({ "error": body }))
1386    };
1387
1388    if !success {
1389        return Err(MasterLogError::Http(format!(
1390            "Master Log returned HTTP {status_code}: {parsed}"
1391        )));
1392    }
1393
1394    let event_id = parsed
1395        .get("events")
1396        .and_then(Value::as_array)
1397        .and_then(|events| events.first())
1398        .and_then(|event| event.get("id"))
1399        .and_then(Value::as_str)
1400        .map(str::to_string);
1401    let accepted = parsed
1402        .get("accepted")
1403        .and_then(Value::as_u64)
1404        .and_then(|value| usize::try_from(value).ok());
1405
1406    Ok(MasterLogResult {
1407        ok: true,
1408        status_code: Some(status_code),
1409        event_id,
1410        error: None,
1411        response: Some(parsed),
1412        queued: false,
1413        accepted,
1414    })
1415}
1416
1417fn default_title(title: Option<&str>, body: &str) -> String {
1418    if let Some(title) = title.map(str::trim).filter(|title| !title.is_empty()) {
1419        return title.to_string();
1420    }
1421
1422    let first_line = body
1423        .trim()
1424        .lines()
1425        .next()
1426        .filter(|line| !line.is_empty())
1427        .unwrap_or("Rust log event");
1428    first_line.chars().take(120).collect()
1429}
1430
1431fn normalize_tags(tags: Vec<String>) -> Vec<String> {
1432    tags.into_iter()
1433        .flat_map(|tag| {
1434            tag.split(',')
1435                .map(str::trim)
1436                .filter(|tag| !tag.is_empty())
1437                .map(str::to_string)
1438                .collect::<Vec<_>>()
1439        })
1440        .collect::<BTreeSet<_>>()
1441        .into_iter()
1442        .collect()
1443}
1444
1445fn merge_metadata(metadata: Value) -> Value {
1446    let mut object = match metadata {
1447        Value::Object(object) => object,
1448        value => {
1449            let mut object = Map::new();
1450            object.insert("value".to_string(), value);
1451            object
1452        }
1453    };
1454
1455    let client_metadata = rust_client_metadata();
1456    if object.contains_key("rust_client") {
1457        object.insert("_rust_client".to_string(), client_metadata);
1458    } else {
1459        object.insert("rust_client".to_string(), client_metadata);
1460    }
1461
1462    Value::Object(object)
1463}
1464
1465fn rust_client_metadata() -> Value {
1466    let argv0 = env::args().next().unwrap_or_default();
1467    let process_name = process_name(&argv0);
1468    let hostname = hostname::get()
1469        .ok()
1470        .and_then(|value| value.into_string().ok())
1471        .unwrap_or_else(|| "unknown".to_string());
1472    let cwd = env::current_dir()
1473        .unwrap_or_else(|_| PathBuf::from(""))
1474        .display()
1475        .to_string();
1476
1477    json!({
1478        "hostname": hostname,
1479        "pid": std::process::id(),
1480        "process_name": process_name,
1481        "argv0": argv0,
1482        "cwd": cwd,
1483        "library": "master-log-client",
1484        "library_version": LIBRARY_VERSION,
1485    })
1486}
1487
1488fn process_name(argv0: &str) -> String {
1489    if !argv0.trim().is_empty() {
1490        return PathBuf::from(argv0)
1491            .file_name()
1492            .and_then(|value| value.to_str())
1493            .unwrap_or("rust")
1494            .to_string();
1495    }
1496
1497    env::current_exe()
1498        .ok()
1499        .and_then(|path| path.file_name().map(|value| value.to_owned()))
1500        .and_then(|value| value.into_string().ok())
1501        .unwrap_or_else(|| "rust".to_string())
1502}
1503
1504fn env_non_empty(name: &str) -> Option<String> {
1505    env::var(name)
1506        .ok()
1507        .map(|value| value.trim().to_string())
1508        .filter(|value| !value.is_empty())
1509}
1510
1511fn env_bool(name: &str) -> Option<bool> {
1512    match env::var(name).ok()?.trim().to_ascii_lowercase().as_str() {
1513        "1" | "true" | "yes" | "on" => Some(true),
1514        "0" | "false" | "no" | "off" => Some(false),
1515        _ => None,
1516    }
1517}
1518
1519fn env_f64(name: &str) -> Option<f64> {
1520    env::var(name).ok()?.trim().parse::<f64>().ok()
1521}
1522
1523fn env_usize(name: &str) -> Option<usize> {
1524    env::var(name).ok()?.trim().parse::<usize>().ok()
1525}
1526
1527fn env_duration(name: &str) -> Option<Duration> {
1528    env_f64(name).map(duration_from_seconds)
1529}
1530
1531fn positive_usize(value: Option<usize>, default: usize) -> usize {
1532    value.filter(|value| *value > 0).unwrap_or(default)
1533}
1534
1535fn non_negative_usize(value: Option<usize>, default: usize) -> usize {
1536    value.unwrap_or(default)
1537}
1538
1539fn positive_duration(value: Option<Duration>, default: Duration) -> Duration {
1540    value.filter(|value| !value.is_zero()).unwrap_or(default)
1541}
1542
1543fn non_negative_duration(value: Option<Duration>, default: Duration) -> Duration {
1544    value.unwrap_or(default)
1545}
1546
1547fn non_negative_f64(value: Option<f64>, default: f64) -> f64 {
1548    value.filter(|value| *value >= 0.0).unwrap_or(default)
1549}
1550
1551fn duration_from_seconds(seconds: f64) -> Duration {
1552    Duration::from_secs_f64(seconds.max(0.0))
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557    use super::*;
1558
1559    #[test]
1560    fn logs_url_accepts_root_api_and_logs_endpoint() {
1561        assert_eq!(
1562            logs_url("http://localhost:8000").unwrap(),
1563            "http://localhost:8000/api/v1/logs"
1564        );
1565        assert_eq!(
1566            logs_url("http://localhost:8000/api/v1").unwrap(),
1567            "http://localhost:8000/api/v1/logs"
1568        );
1569        assert_eq!(
1570            logs_url("http://localhost:8000/api/v1/logs").unwrap(),
1571            "http://localhost:8000/api/v1/logs"
1572        );
1573        assert_eq!(
1574            logs_url("http://localhost:8000/api/v1/logs/batch").unwrap(),
1575            "http://localhost:8000/api/v1/logs"
1576        );
1577    }
1578
1579    #[test]
1580    fn batch_logs_url_accepts_root_api_logs_and_batch_endpoint() {
1581        assert_eq!(
1582            batch_logs_url("http://localhost:8000").unwrap(),
1583            "http://localhost:8000/api/v1/logs/batch"
1584        );
1585        assert_eq!(
1586            batch_logs_url("http://localhost:8000/api/v1").unwrap(),
1587            "http://localhost:8000/api/v1/logs/batch"
1588        );
1589        assert_eq!(
1590            batch_logs_url("http://localhost:8000/api/v1/logs").unwrap(),
1591            "http://localhost:8000/api/v1/logs/batch"
1592        );
1593        assert_eq!(
1594            batch_logs_url("http://localhost:8000/api/v1/logs/batch").unwrap(),
1595            "http://localhost:8000/api/v1/logs/batch"
1596        );
1597    }
1598
1599    #[test]
1600    fn log_payload_is_enriched_and_normalizes_tags() {
1601        let payload = LogEntry::new("hello M31")
1602            .severity(Severity::Warn)
1603            .tags(["ccd, telescope", "ccd"])
1604            .metadata(json!({ "frame": 42 }))
1605            .into_payload();
1606
1607        assert_eq!(payload.severity, Severity::Warn);
1608        assert_eq!(payload.body, "hello M31");
1609        assert_eq!(payload.title, "hello M31");
1610        assert_eq!(payload.tags, vec!["ccd", "telescope"]);
1611        assert_eq!(payload.metadata["frame"], 42);
1612        assert!(payload.metadata.get("rust_client").is_some());
1613    }
1614
1615    #[test]
1616    fn request_interval_sleep_uses_last_request_start() {
1617        let now = Instant::now();
1618        assert_eq!(
1619            request_interval_sleep(Duration::from_millis(250), None, now),
1620            Duration::ZERO
1621        );
1622        assert_eq!(
1623            request_interval_sleep(Duration::ZERO, Some(now), now),
1624            Duration::ZERO
1625        );
1626        assert_eq!(
1627            request_interval_sleep(
1628                Duration::from_millis(250),
1629                Some(now - Duration::from_millis(100)),
1630                now
1631            ),
1632            Duration::from_millis(150)
1633        );
1634        assert_eq!(
1635            request_interval_sleep(
1636                Duration::from_millis(250),
1637                Some(now - Duration::from_millis(1000)),
1638                now
1639            ),
1640            Duration::ZERO
1641        );
1642    }
1643}