1use std::{
2 collections::BTreeSet,
3 env,
4 error::Error,
5 fmt,
6 path::PathBuf,
7 str::FromStr,
8 sync::{
9 mpsc::{self, Receiver, RecvTimeoutError, SyncSender, TryRecvError, TrySendError},
10 Arc, Mutex, OnceLock,
11 },
12 thread,
13 time::{Duration, Instant},
14};
15
16use chrono::{DateTime, Utc};
17use reqwest::blocking::Client as HttpClient;
18use serde::{Deserialize, Serialize};
19use serde_json::{json, Map, Value};
20
21pub const MASTER_LOG_API_KEY_ENV: &str = "MASTER_LOG_API_KEY";
22pub const MASTER_LOG_ENDPOINT_ENV: &str = "MASTER_LOG_ENDPOINT";
23pub const MASTER_LOG_TIMEOUT_ENV: &str = "MASTER_LOG_TIMEOUT_SECONDS";
24pub const MASTER_LOG_ECHO_ENV: &str = "MASTER_LOG_ECHO";
25pub const MASTER_LOG_ASYNC_ENV: &str = "MASTER_LOG_ASYNC";
26pub const MASTER_LOG_BATCH_SIZE_ENV: &str = "MASTER_LOG_BATCH_SIZE";
27pub const MASTER_LOG_FLUSH_INTERVAL_ENV: &str = "MASTER_LOG_FLUSH_INTERVAL_SECONDS";
28pub const MASTER_LOG_MAX_QUEUE_SIZE_ENV: &str = "MASTER_LOG_MAX_QUEUE_SIZE";
29pub const MASTER_LOG_DROP_WHEN_FULL_ENV: &str = "MASTER_LOG_DROP_WHEN_FULL";
30pub const MASTER_LOG_QUEUE_TIMEOUT_ENV: &str = "MASTER_LOG_QUEUE_TIMEOUT_SECONDS";
31pub const MASTER_LOG_BACKPRESSURE_ENV: &str = "MASTER_LOG_BACKPRESSURE";
32pub const MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG_ENV: &str =
33 "MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG";
34pub const MASTER_LOG_MAX_ENQUEUE_SLEEP_ENV: &str = "MASTER_LOG_MAX_ENQUEUE_SLEEP_SECONDS";
35pub const MASTER_LOG_MIN_REQUEST_INTERVAL_ENV: &str = "MASTER_LOG_MIN_REQUEST_INTERVAL_SECONDS";
36
37pub const DEFAULT_TIMEOUT_SECONDS: f64 = 2.0;
38pub const DEFAULT_ASYNC_MODE: bool = true;
39pub const DEFAULT_BATCH_SIZE: usize = 100;
40pub const DEFAULT_FLUSH_INTERVAL_SECONDS: f64 = 1.0;
41pub const DEFAULT_MAX_QUEUE_SIZE: usize = 10_000;
42pub const DEFAULT_DROP_WHEN_FULL: bool = true;
43pub const DEFAULT_QUEUE_TIMEOUT_SECONDS: f64 = 0.25;
44pub const DEFAULT_BACKPRESSURE: bool = true;
45pub const DEFAULT_INITIAL_SEND_SECONDS_PER_LOG: f64 = 0.005;
46pub const DEFAULT_MAX_ENQUEUE_SLEEP_SECONDS: f64 = 0.25;
47pub const DEFAULT_MIN_REQUEST_INTERVAL_SECONDS: f64 = 0.0;
48pub const DEFAULT_SEND_RATE_SMOOTHING: f64 = 0.2;
49pub const DEFAULT_SHUTDOWN_TIMEOUT_SECONDS: f64 = 5.0;
50pub const LIBRARY_VERSION: &str = env!("CARGO_PKG_VERSION");
51
52#[derive(Clone, Debug)]
53pub struct MasterLogConfig {
54 pub api_key: Option<String>,
55 pub endpoint: Option<String>,
56 pub timeout: Duration,
57 pub echo: bool,
58 pub async_mode: bool,
59 pub batch_size: usize,
60 pub flush_interval: Duration,
61 pub max_queue_size: usize,
62 pub drop_when_full: bool,
63 pub queue_timeout: Duration,
64 pub backpressure: bool,
65 pub initial_send_seconds_per_log: f64,
66 pub max_enqueue_sleep: Duration,
67 pub min_request_interval: Duration,
68}
69
70pub type Config = MasterLogConfig;
71
72impl Default for MasterLogConfig {
73 fn default() -> Self {
74 Self {
75 api_key: None,
76 endpoint: None,
77 timeout: duration_from_seconds(DEFAULT_TIMEOUT_SECONDS),
78 echo: false,
79 async_mode: DEFAULT_ASYNC_MODE,
80 batch_size: DEFAULT_BATCH_SIZE,
81 flush_interval: duration_from_seconds(DEFAULT_FLUSH_INTERVAL_SECONDS),
82 max_queue_size: DEFAULT_MAX_QUEUE_SIZE,
83 drop_when_full: DEFAULT_DROP_WHEN_FULL,
84 queue_timeout: duration_from_seconds(DEFAULT_QUEUE_TIMEOUT_SECONDS),
85 backpressure: DEFAULT_BACKPRESSURE,
86 initial_send_seconds_per_log: DEFAULT_INITIAL_SEND_SECONDS_PER_LOG,
87 max_enqueue_sleep: duration_from_seconds(DEFAULT_MAX_ENQUEUE_SLEEP_SECONDS),
88 min_request_interval: duration_from_seconds(DEFAULT_MIN_REQUEST_INTERVAL_SECONDS),
89 }
90 }
91}
92
93impl MasterLogConfig {
94 pub fn from_env() -> Self {
95 let mut config = Self::default();
96 config.api_key = env_non_empty(MASTER_LOG_API_KEY_ENV);
97 config.endpoint = env_non_empty(MASTER_LOG_ENDPOINT_ENV);
98 config.timeout = env_duration(MASTER_LOG_TIMEOUT_ENV).unwrap_or(config.timeout);
99 config.echo = env_bool(MASTER_LOG_ECHO_ENV).unwrap_or(config.echo);
100 config.async_mode = env_bool(MASTER_LOG_ASYNC_ENV).unwrap_or(config.async_mode);
101 config.batch_size = positive_usize(env_usize(MASTER_LOG_BATCH_SIZE_ENV), config.batch_size);
102 config.flush_interval = positive_duration(
103 env_duration(MASTER_LOG_FLUSH_INTERVAL_ENV),
104 config.flush_interval,
105 );
106 config.max_queue_size = non_negative_usize(
107 env_usize(MASTER_LOG_MAX_QUEUE_SIZE_ENV),
108 config.max_queue_size,
109 );
110 config.drop_when_full =
111 env_bool(MASTER_LOG_DROP_WHEN_FULL_ENV).unwrap_or(config.drop_when_full);
112 config.queue_timeout = non_negative_duration(
113 env_duration(MASTER_LOG_QUEUE_TIMEOUT_ENV),
114 config.queue_timeout,
115 );
116 config.backpressure = env_bool(MASTER_LOG_BACKPRESSURE_ENV).unwrap_or(config.backpressure);
117 config.initial_send_seconds_per_log = non_negative_f64(
118 env_f64(MASTER_LOG_INITIAL_SEND_SECONDS_PER_LOG_ENV),
119 config.initial_send_seconds_per_log,
120 );
121 config.max_enqueue_sleep = non_negative_duration(
122 env_duration(MASTER_LOG_MAX_ENQUEUE_SLEEP_ENV),
123 config.max_enqueue_sleep,
124 );
125 config.min_request_interval = non_negative_duration(
126 env_duration(MASTER_LOG_MIN_REQUEST_INTERVAL_ENV),
127 config.min_request_interval,
128 );
129 config
130 }
131
132 pub fn api_key(mut self, value: impl Into<String>) -> Self {
133 self.api_key = Some(value.into());
134 self
135 }
136
137 pub fn endpoint(mut self, value: impl Into<String>) -> Self {
138 self.endpoint = Some(value.into());
139 self
140 }
141
142 pub fn timeout(mut self, value: Duration) -> Self {
143 self.timeout = value;
144 self
145 }
146
147 pub fn echo(mut self, value: bool) -> Self {
148 self.echo = value;
149 self
150 }
151
152 pub fn async_mode(mut self, value: bool) -> Self {
153 self.async_mode = value;
154 self
155 }
156
157 pub fn batch_size(mut self, value: usize) -> Self {
158 self.batch_size = value.max(1);
159 self
160 }
161
162 pub fn flush_interval(mut self, value: Duration) -> Self {
163 self.flush_interval = if value.is_zero() {
164 duration_from_seconds(DEFAULT_FLUSH_INTERVAL_SECONDS)
165 } else {
166 value
167 };
168 self
169 }
170
171 pub fn max_queue_size(mut self, value: usize) -> Self {
172 self.max_queue_size = value;
173 self
174 }
175
176 pub fn drop_when_full(mut self, value: bool) -> Self {
177 self.drop_when_full = value;
178 self
179 }
180
181 pub fn queue_timeout(mut self, value: Duration) -> Self {
182 self.queue_timeout = value;
183 self
184 }
185
186 pub fn backpressure(mut self, value: bool) -> Self {
187 self.backpressure = value;
188 self
189 }
190
191 pub fn initial_send_seconds_per_log(mut self, value: f64) -> Self {
192 self.initial_send_seconds_per_log = value.max(0.0);
193 self
194 }
195
196 pub fn max_enqueue_sleep(mut self, value: Duration) -> Self {
197 self.max_enqueue_sleep = value;
198 self
199 }
200
201 pub fn min_request_interval(mut self, value: Duration) -> Self {
202 self.min_request_interval = value;
203 self
204 }
205}
206
207#[derive(Clone, Debug)]
208pub struct MasterLogResult {
209 pub ok: bool,
210 pub status_code: Option<u16>,
211 pub event_id: Option<String>,
212 pub error: Option<String>,
213 pub response: Option<Value>,
214 pub queued: bool,
215 pub accepted: Option<usize>,
216}
217
218impl MasterLogResult {
219 pub fn ok() -> Self {
220 Self {
221 ok: true,
222 status_code: None,
223 event_id: None,
224 error: None,
225 response: None,
226 queued: false,
227 accepted: None,
228 }
229 }
230
231 pub fn queued() -> Self {
232 Self {
233 queued: true,
234 ..Self::ok()
235 }
236 }
237
238 pub fn failed(error: impl Into<String>) -> Self {
239 Self {
240 ok: false,
241 status_code: None,
242 event_id: None,
243 error: Some(error.into()),
244 response: None,
245 queued: false,
246 accepted: None,
247 }
248 }
249}
250
251#[derive(Debug)]
252pub enum MasterLogError {
253 MissingConfig(&'static str),
254 InvalidSeverity(String),
255 EmptyEndpoint,
256 Http(String),
257 Json(String),
258 QueueFull,
259 WorkerUnavailable,
260 Timeout(String),
261}
262
263impl fmt::Display for MasterLogError {
264 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
265 match self {
266 Self::MissingConfig(name) => write!(formatter, "{name} is required"),
267 Self::InvalidSeverity(value) => write!(
268 formatter,
269 "severity must be one of trace, debug, info, warn, error, fatal; got {value}"
270 ),
271 Self::EmptyEndpoint => write!(formatter, "{MASTER_LOG_ENDPOINT_ENV} is empty"),
272 Self::Http(message) => formatter.write_str(message),
273 Self::Json(message) => formatter.write_str(message),
274 Self::QueueFull => formatter.write_str("Master Log queue is full"),
275 Self::WorkerUnavailable => formatter.write_str("Master Log worker is unavailable"),
276 Self::Timeout(message) => formatter.write_str(message),
277 }
278 }
279}
280
281impl Error for MasterLogError {}
282
283impl From<MasterLogError> for MasterLogResult {
284 fn from(error: MasterLogError) -> Self {
285 Self::failed(error.to_string())
286 }
287}
288
289#[derive(Clone, Copy, Debug, Deserialize, Eq, Hash, Ord, PartialEq, PartialOrd, Serialize)]
290#[serde(rename_all = "lowercase")]
291pub enum Severity {
292 Trace,
293 Debug,
294 Info,
295 Warn,
296 Error,
297 Fatal,
298}
299
300impl Severity {
301 pub fn all() -> [Self; 6] {
302 [
303 Self::Trace,
304 Self::Debug,
305 Self::Info,
306 Self::Warn,
307 Self::Error,
308 Self::Fatal,
309 ]
310 }
311}
312
313impl fmt::Display for Severity {
314 fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result {
315 let value = match self {
316 Self::Trace => "trace",
317 Self::Debug => "debug",
318 Self::Info => "info",
319 Self::Warn => "warn",
320 Self::Error => "error",
321 Self::Fatal => "fatal",
322 };
323 formatter.write_str(value)
324 }
325}
326
327impl FromStr for Severity {
328 type Err = MasterLogError;
329
330 fn from_str(value: &str) -> Result<Self, Self::Err> {
331 match value.trim().to_ascii_lowercase().as_str() {
332 "trace" => Ok(Self::Trace),
333 "debug" => Ok(Self::Debug),
334 "info" => Ok(Self::Info),
335 "warn" | "warning" => Ok(Self::Warn),
336 "error" => Ok(Self::Error),
337 "fatal" => Ok(Self::Fatal),
338 other => Err(MasterLogError::InvalidSeverity(other.to_string())),
339 }
340 }
341}
342
343#[derive(Clone, Debug)]
344pub struct LogEntry {
345 body: String,
346 title: Option<String>,
347 severity: Severity,
348 tags: Vec<String>,
349 metadata: Value,
350 ttl_seconds: Option<u64>,
351 expires_at: Option<DateTime<Utc>>,
352 created_at: Option<DateTime<Utc>>,
353}
354
355impl LogEntry {
356 pub fn new(body: impl Into<String>) -> Self {
357 Self {
358 body: body.into(),
359 title: None,
360 severity: Severity::Info,
361 tags: Vec::new(),
362 metadata: Value::Object(Map::new()),
363 ttl_seconds: None,
364 expires_at: None,
365 created_at: None,
366 }
367 }
368
369 pub fn with_title(title: impl Into<String>, body: impl Into<String>) -> Self {
370 Self::new(body).title(title)
371 }
372
373 pub fn severity(mut self, value: Severity) -> Self {
374 self.severity = value;
375 self
376 }
377
378 pub fn severity_str(mut self, value: impl AsRef<str>) -> Result<Self, MasterLogError> {
379 self.severity = Severity::from_str(value.as_ref())?;
380 Ok(self)
381 }
382
383 pub fn title(mut self, value: impl Into<String>) -> Self {
384 self.title = Some(value.into());
385 self
386 }
387
388 pub fn tag(mut self, value: impl Into<String>) -> Self {
389 self.tags.push(value.into());
390 self
391 }
392
393 pub fn tags<I, S>(mut self, values: I) -> Self
394 where
395 I: IntoIterator<Item = S>,
396 S: Into<String>,
397 {
398 self.tags.extend(values.into_iter().map(Into::into));
399 self
400 }
401
402 pub fn metadata(mut self, value: Value) -> Self {
403 self.metadata = value;
404 self
405 }
406
407 pub fn metadata_field(mut self, key: impl Into<String>, value: impl Serialize) -> Self {
408 let serialized = serde_json::to_value(value).unwrap_or_else(|error| {
409 Value::String(format!("failed to serialize metadata value: {error}"))
410 });
411
412 match &mut self.metadata {
413 Value::Object(object) => {
414 object.insert(key.into(), serialized);
415 }
416 existing => {
417 let mut object = Map::new();
418 object.insert("value".to_string(), existing.clone());
419 object.insert(key.into(), serialized);
420 self.metadata = Value::Object(object);
421 }
422 }
423
424 self
425 }
426
427 pub fn ttl_seconds(mut self, value: u64) -> Self {
428 self.ttl_seconds = Some(value);
429 self
430 }
431
432 pub fn expires_at(mut self, value: DateTime<Utc>) -> Self {
433 self.expires_at = Some(value);
434 self
435 }
436
437 pub fn created_at(mut self, value: DateTime<Utc>) -> Self {
438 self.created_at = Some(value);
439 self
440 }
441
442 fn into_payload(self) -> LogPayload {
443 let body = self.body;
444 let title = default_title(self.title.as_deref(), &body);
445 LogPayload {
446 severity: self.severity,
447 tags: normalize_tags(self.tags),
448 title,
449 body,
450 created_at: self.created_at,
451 expires_at: self.expires_at,
452 ttl_seconds: self.ttl_seconds,
453 metadata: merge_metadata(self.metadata),
454 }
455 }
456}
457
458#[derive(Clone, Debug)]
459pub struct MasterLogClient {
460 inner: Arc<ClientInner>,
461}
462
463#[derive(Debug)]
464struct ClientInner {
465 config: MasterLogConfig,
466 sender: Mutex<Option<SyncSender<WorkerMessage>>>,
467 handle: Mutex<Option<thread::JoinHandle<()>>>,
468 send_seconds_per_log: Arc<Mutex<f64>>,
469}
470
471impl Drop for ClientInner {
472 fn drop(&mut self) {
473 if let Ok(sender_slot) = self.sender.get_mut() {
474 if let Some(sender) = sender_slot.take() {
475 let (ack_sender, ack_receiver) = mpsc::channel();
476 let timeout = duration_from_seconds(DEFAULT_SHUTDOWN_TIMEOUT_SECONDS);
477 if send_command_with_timeout(
478 &sender,
479 WorkerMessage::Shutdown(ack_sender),
480 timeout,
481 false,
482 )
483 .is_ok()
484 {
485 let _ = ack_receiver.recv_timeout(timeout);
486 }
487 }
488 }
489
490 if let Ok(handle_slot) = self.handle.get_mut() {
491 if let Some(handle) = handle_slot.take() {
492 let _ = handle.join();
493 }
494 }
495 }
496}
497
498impl MasterLogClient {
499 pub fn new(config: MasterLogConfig) -> Self {
500 Self {
501 inner: Arc::new(ClientInner {
502 send_seconds_per_log: Arc::new(Mutex::new(config.initial_send_seconds_per_log)),
503 config,
504 sender: Mutex::new(None),
505 handle: Mutex::new(None),
506 }),
507 }
508 }
509
510 pub fn from_env() -> Self {
511 Self::new(MasterLogConfig::from_env())
512 }
513
514 pub fn log(&self, body: impl Into<String>) -> MasterLogResult {
515 self.log_entry(LogEntry::new(body))
516 }
517
518 pub fn try_log(&self, body: impl Into<String>) -> Result<MasterLogResult, MasterLogError> {
519 self.try_log_entry(LogEntry::new(body))
520 }
521
522 pub fn log_entry(&self, entry: LogEntry) -> MasterLogResult {
523 self.try_log_entry(entry).unwrap_or_else(Into::into)
524 }
525
526 pub fn try_log_entry(&self, entry: LogEntry) -> Result<MasterLogResult, MasterLogError> {
527 let payload = entry.into_payload();
528 if self.inner.config.echo {
529 println!("{}", payload.body);
530 }
531
532 if self.inner.config.async_mode {
533 self.enqueue(payload)
534 } else {
535 self.send(payload)
536 }
537 }
538
539 pub fn trace(&self, body: impl Into<String>) -> MasterLogResult {
540 self.log_entry(LogEntry::new(body).severity(Severity::Trace))
541 }
542
543 pub fn debug(&self, body: impl Into<String>) -> MasterLogResult {
544 self.log_entry(LogEntry::new(body).severity(Severity::Debug))
545 }
546
547 pub fn info(&self, body: impl Into<String>) -> MasterLogResult {
548 self.log_entry(LogEntry::new(body).severity(Severity::Info))
549 }
550
551 pub fn warn(&self, body: impl Into<String>) -> MasterLogResult {
552 self.log_entry(LogEntry::new(body).severity(Severity::Warn))
553 }
554
555 pub fn error(&self, body: impl Into<String>) -> MasterLogResult {
556 self.log_entry(LogEntry::new(body).severity(Severity::Error))
557 }
558
559 pub fn fatal(&self, body: impl Into<String>) -> MasterLogResult {
560 self.log_entry(LogEntry::new(body).severity(Severity::Fatal))
561 }
562
563 pub fn flush(&self, timeout: Duration) -> MasterLogResult {
564 self.try_flush(timeout).unwrap_or_else(Into::into)
565 }
566
567 pub fn try_flush(&self, timeout: Duration) -> Result<MasterLogResult, MasterLogError> {
568 if !self.inner.config.async_mode {
569 return Ok(MasterLogResult {
570 accepted: Some(0),
571 ..MasterLogResult::ok()
572 });
573 }
574
575 self.require_config()?;
576 let sender = self.ensure_worker()?;
577 let (ack_sender, ack_receiver) = mpsc::channel();
578 send_command_with_timeout(&sender, WorkerMessage::Flush(ack_sender), timeout, false)?;
579
580 let result = ack_receiver
581 .recv_timeout(timeout)
582 .map_err(|error| match error {
583 mpsc::RecvTimeoutError::Timeout => MasterLogError::Timeout(
584 "timed out waiting for Master Log worker flush".to_string(),
585 ),
586 mpsc::RecvTimeoutError::Disconnected => MasterLogError::WorkerUnavailable,
587 })?;
588
589 Ok(result.into_result())
590 }
591
592 pub fn shutdown(&self, timeout: Duration) -> MasterLogResult {
593 self.try_shutdown(timeout).unwrap_or_else(Into::into)
594 }
595
596 pub fn try_shutdown(&self, timeout: Duration) -> Result<MasterLogResult, MasterLogError> {
597 if !self.inner.config.async_mode {
598 return Ok(MasterLogResult {
599 accepted: Some(0),
600 ..MasterLogResult::ok()
601 });
602 }
603
604 let sender = {
605 let sender_guard = self
606 .inner
607 .sender
608 .lock()
609 .map_err(|_| MasterLogError::WorkerUnavailable)?;
610 sender_guard.clone()
611 };
612
613 let Some(sender) = sender else {
614 return Ok(MasterLogResult {
615 accepted: Some(0),
616 ..MasterLogResult::ok()
617 });
618 };
619
620 let (ack_sender, ack_receiver) = mpsc::channel();
621 send_command_with_timeout(&sender, WorkerMessage::Shutdown(ack_sender), timeout, false)?;
622
623 let result = ack_receiver
624 .recv_timeout(timeout)
625 .map_err(|error| match error {
626 mpsc::RecvTimeoutError::Timeout => MasterLogError::Timeout(
627 "timed out waiting for Master Log worker shutdown".to_string(),
628 ),
629 mpsc::RecvTimeoutError::Disconnected => MasterLogError::WorkerUnavailable,
630 })?;
631
632 if let Ok(mut sender_guard) = self.inner.sender.lock() {
633 sender_guard.take();
634 }
635
636 if let Ok(mut handle_guard) = self.inner.handle.lock() {
637 if let Some(handle) = handle_guard.take() {
638 let _ = handle.join();
639 }
640 }
641
642 Ok(result.into_result())
643 }
644
645 fn enqueue(&self, payload: LogPayload) -> Result<MasterLogResult, MasterLogError> {
646 self.require_config()?;
647 let sender = self.ensure_worker()?;
648 let command = WorkerMessage::Log(payload);
649
650 if self.inner.config.drop_when_full {
651 sender.try_send(command).map_err(|error| match error {
652 TrySendError::Full(_) => MasterLogError::QueueFull,
653 TrySendError::Disconnected(_) => MasterLogError::WorkerUnavailable,
654 })?;
655 } else {
656 send_command_with_timeout(
657 &sender,
658 command,
659 self.inner.config.queue_timeout,
660 self.inner.config.drop_when_full,
661 )?;
662 }
663
664 self.apply_enqueue_backpressure();
665 Ok(MasterLogResult::queued())
666 }
667
668 fn send(&self, payload: LogPayload) -> Result<MasterLogResult, MasterLogError> {
669 self.require_config()?;
670 let endpoint = self
671 .inner
672 .config
673 .endpoint
674 .as_deref()
675 .ok_or(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV))?;
676 let api_key = self
677 .inner
678 .config
679 .api_key
680 .as_deref()
681 .ok_or(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV))?;
682 let url = logs_url(endpoint)?;
683 let http = build_http_client(self.inner.config.timeout)?;
684 post_json(&http, &url, api_key, &payload)
685 }
686
687 fn ensure_worker(&self) -> Result<SyncSender<WorkerMessage>, MasterLogError> {
688 self.require_config()?;
689
690 let mut sender_guard = self
691 .inner
692 .sender
693 .lock()
694 .map_err(|_| MasterLogError::WorkerUnavailable)?;
695 let mut handle_guard = self
696 .inner
697 .handle
698 .lock()
699 .map_err(|_| MasterLogError::WorkerUnavailable)?;
700
701 if handle_guard
702 .as_ref()
703 .is_some_and(|handle| handle.is_finished())
704 {
705 sender_guard.take();
706 if let Some(handle) = handle_guard.take() {
707 let _ = handle.join();
708 }
709 }
710
711 if let Some(sender) = sender_guard.as_ref() {
712 return Ok(sender.clone());
713 }
714
715 let (sender, receiver) = mpsc::sync_channel(self.inner.config.max_queue_size);
716 let config = WorkerConfig::from_client_config(&self.inner.config)?;
717 let send_seconds_per_log = Arc::clone(&self.inner.send_seconds_per_log);
718
719 let handle = thread::Builder::new()
720 .name("master-log-client".to_string())
721 .spawn(move || batch_worker(config, receiver, send_seconds_per_log))
722 .map_err(|error| {
723 MasterLogError::Http(format!("failed to start Master Log worker: {error}"))
724 })?;
725
726 *sender_guard = Some(sender.clone());
727 *handle_guard = Some(handle);
728 Ok(sender)
729 }
730
731 fn require_config(&self) -> Result<(), MasterLogError> {
732 if self
733 .inner
734 .config
735 .api_key
736 .as_deref()
737 .unwrap_or("")
738 .is_empty()
739 {
740 return Err(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV));
741 }
742 if self
743 .inner
744 .config
745 .endpoint
746 .as_deref()
747 .unwrap_or("")
748 .is_empty()
749 {
750 return Err(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV));
751 }
752 Ok(())
753 }
754
755 fn apply_enqueue_backpressure(&self) {
756 if !self.inner.config.backpressure {
757 return;
758 }
759
760 let sleep_seconds = self
761 .inner
762 .send_seconds_per_log
763 .lock()
764 .map(|value| *value)
765 .unwrap_or(self.inner.config.initial_send_seconds_per_log)
766 .max(0.0);
767 let sleep = duration_from_seconds(sleep_seconds).min(self.inner.config.max_enqueue_sleep);
768
769 if !sleep.is_zero() {
770 thread::sleep(sleep);
771 }
772 }
773}
774
775static DEFAULT_CLIENT: OnceLock<Mutex<Option<MasterLogClient>>> = OnceLock::new();
776
777pub fn get_default_client() -> MasterLogClient {
778 let lock = DEFAULT_CLIENT.get_or_init(|| Mutex::new(None));
779 let mut guard = lock
780 .lock()
781 .expect("default Master Log client lock poisoned");
782 if let Some(client) = guard.as_ref() {
783 return client.clone();
784 }
785
786 let client = MasterLogClient::from_env();
787 *guard = Some(client.clone());
788 client
789}
790
791pub fn configure(config: MasterLogConfig) -> MasterLogResult {
792 configure_default_client(config)
793}
794
795pub fn configure_default_client(config: MasterLogConfig) -> MasterLogResult {
796 let lock = DEFAULT_CLIENT.get_or_init(|| Mutex::new(None));
797 let mut guard = lock
798 .lock()
799 .expect("default Master Log client lock poisoned");
800 if let Some(old_client) = guard.take() {
801 let _ = old_client.shutdown(duration_from_seconds(1.0));
802 }
803
804 *guard = Some(MasterLogClient::new(config));
805 MasterLogResult::ok()
806}
807
808pub fn log(body: impl Into<String>) -> MasterLogResult {
809 get_default_client().log(body)
810}
811
812pub fn mlog(body: impl Into<String>) -> MasterLogResult {
813 log(body)
814}
815
816pub fn log_entry(entry: LogEntry) -> MasterLogResult {
817 get_default_client().log_entry(entry)
818}
819
820pub fn trace(body: impl Into<String>) -> MasterLogResult {
821 get_default_client().trace(body)
822}
823
824pub fn debug(body: impl Into<String>) -> MasterLogResult {
825 get_default_client().debug(body)
826}
827
828pub fn info(body: impl Into<String>) -> MasterLogResult {
829 get_default_client().info(body)
830}
831
832pub fn warn(body: impl Into<String>) -> MasterLogResult {
833 get_default_client().warn(body)
834}
835
836pub fn error(body: impl Into<String>) -> MasterLogResult {
837 get_default_client().error(body)
838}
839
840pub fn fatal(body: impl Into<String>) -> MasterLogResult {
841 get_default_client().fatal(body)
842}
843
844pub fn flush(timeout: Duration) -> MasterLogResult {
845 get_default_client().flush(timeout)
846}
847
848pub fn shutdown(timeout: Duration) -> MasterLogResult {
849 get_default_client().shutdown(timeout)
850}
851
852#[macro_export]
853macro_rules! mlogf {
854 ($($arg:tt)*) => {
855 $crate::mlog(format!($($arg)*))
856 };
857}
858
859#[macro_export]
860macro_rules! tracef {
861 ($($arg:tt)*) => {
862 $crate::trace(format!($($arg)*))
863 };
864}
865
866#[macro_export]
867macro_rules! debugf {
868 ($($arg:tt)*) => {
869 $crate::debug(format!($($arg)*))
870 };
871}
872
873#[macro_export]
874macro_rules! infof {
875 ($($arg:tt)*) => {
876 $crate::info(format!($($arg)*))
877 };
878}
879
880#[macro_export]
881macro_rules! warnf {
882 ($($arg:tt)*) => {
883 $crate::warn(format!($($arg)*))
884 };
885}
886
887#[macro_export]
888macro_rules! errorf {
889 ($($arg:tt)*) => {
890 $crate::error(format!($($arg)*))
891 };
892}
893
894#[macro_export]
895macro_rules! fatalf {
896 ($($arg:tt)*) => {
897 $crate::fatal(format!($($arg)*))
898 };
899}
900
901pub fn logs_url(endpoint: &str) -> Result<String, MasterLogError> {
902 let endpoint = endpoint.trim().trim_end_matches('/');
903 if endpoint.is_empty() {
904 return Err(MasterLogError::EmptyEndpoint);
905 }
906
907 if endpoint.ends_with("/api/v1/logs/batch") {
908 Ok(endpoint.trim_end_matches("/batch").to_string())
909 } else if endpoint.ends_with("/api/v1/logs") {
910 Ok(endpoint.to_string())
911 } else if endpoint.ends_with("/api/v1") {
912 Ok(format!("{endpoint}/logs"))
913 } else {
914 Ok(format!("{endpoint}/api/v1/logs"))
915 }
916}
917
918pub fn batch_logs_url(endpoint: &str) -> Result<String, MasterLogError> {
919 let endpoint = endpoint.trim().trim_end_matches('/');
920 if endpoint.is_empty() {
921 return Err(MasterLogError::EmptyEndpoint);
922 }
923
924 if endpoint.ends_with("/api/v1/logs/batch") {
925 Ok(endpoint.to_string())
926 } else if endpoint.ends_with("/api/v1/logs") {
927 Ok(format!("{endpoint}/batch"))
928 } else if endpoint.ends_with("/api/v1") {
929 Ok(format!("{endpoint}/logs/batch"))
930 } else {
931 Ok(format!("{endpoint}/api/v1/logs/batch"))
932 }
933}
934
935#[derive(Clone, Debug, Serialize)]
936struct LogPayload {
937 severity: Severity,
938 tags: Vec<String>,
939 title: String,
940 body: String,
941 #[serde(skip_serializing_if = "Option::is_none")]
942 created_at: Option<DateTime<Utc>>,
943 #[serde(skip_serializing_if = "Option::is_none")]
944 expires_at: Option<DateTime<Utc>>,
945 #[serde(skip_serializing_if = "Option::is_none")]
946 ttl_seconds: Option<u64>,
947 metadata: Value,
948}
949
950#[derive(Clone, Debug)]
951struct WorkerConfig {
952 api_key: String,
953 endpoint: String,
954 timeout: Duration,
955 batch_size: usize,
956 flush_interval: Duration,
957 min_request_interval: Duration,
958}
959
960impl WorkerConfig {
961 fn from_client_config(config: &MasterLogConfig) -> Result<Self, MasterLogError> {
962 Ok(Self {
963 api_key: config
964 .api_key
965 .clone()
966 .ok_or(MasterLogError::MissingConfig(MASTER_LOG_API_KEY_ENV))?,
967 endpoint: config
968 .endpoint
969 .clone()
970 .ok_or(MasterLogError::MissingConfig(MASTER_LOG_ENDPOINT_ENV))?,
971 timeout: config.timeout,
972 batch_size: config.batch_size.max(1),
973 flush_interval: config.flush_interval,
974 min_request_interval: config.min_request_interval,
975 })
976 }
977}
978
979enum WorkerMessage {
980 Log(LogPayload),
981 Flush(mpsc::Sender<WorkerResult>),
982 Shutdown(mpsc::Sender<WorkerResult>),
983}
984
985#[derive(Clone, Debug)]
986struct WorkerResult {
987 ok: bool,
988 accepted: usize,
989 status_code: Option<u16>,
990 response: Option<Value>,
991 error: Option<String>,
992}
993
994impl WorkerResult {
995 fn ok(accepted: usize, status_code: Option<u16>, response: Option<Value>) -> Self {
996 Self {
997 ok: true,
998 accepted,
999 status_code,
1000 response,
1001 error: None,
1002 }
1003 }
1004
1005 fn failed(
1006 accepted: usize,
1007 status_code: Option<u16>,
1008 response: Option<Value>,
1009 error: impl Into<String>,
1010 ) -> Self {
1011 Self {
1012 ok: false,
1013 accepted,
1014 status_code,
1015 response,
1016 error: Some(error.into()),
1017 }
1018 }
1019
1020 fn into_result(self) -> MasterLogResult {
1021 MasterLogResult {
1022 ok: self.ok,
1023 status_code: self.status_code,
1024 event_id: None,
1025 error: self.error,
1026 response: self.response,
1027 queued: false,
1028 accepted: Some(self.accepted),
1029 }
1030 }
1031}
1032
1033fn batch_worker(
1034 config: WorkerConfig,
1035 receiver: Receiver<WorkerMessage>,
1036 send_seconds_per_log: Arc<Mutex<f64>>,
1037) {
1038 let http = match build_http_client(config.timeout) {
1039 Ok(client) => client,
1040 Err(error) => {
1041 fail_all_worker_messages(
1042 receiver,
1043 WorkerResult::failed(0, None, None, error.to_string()),
1044 );
1045 return;
1046 }
1047 };
1048
1049 let mut pending = Vec::<LogPayload>::new();
1050 let mut first_pending_at: Option<Instant> = None;
1051 let mut last_request_started_at: Option<Instant> = None;
1052 let mut last_result = WorkerResult::ok(0, None, None);
1053
1054 loop {
1055 let timeout = worker_wait_timeout(first_pending_at, config.flush_interval);
1056 let mut flush_acks = Vec::new();
1057 let mut shutdown_ack = None;
1058 let mut should_shutdown = false;
1059 let mut due = false;
1060
1061 match receive_worker_message(&receiver, timeout) {
1062 Ok(Some(message)) => handle_worker_message(
1063 message,
1064 &mut pending,
1065 &mut first_pending_at,
1066 &mut flush_acks,
1067 &mut shutdown_ack,
1068 &mut should_shutdown,
1069 ),
1070 Ok(None) => {
1071 due = !pending.is_empty();
1072 }
1073 Err(()) => {
1074 should_shutdown = true;
1075 }
1076 }
1077
1078 drain_worker_messages(
1079 &receiver,
1080 &mut pending,
1081 &mut first_pending_at,
1082 &mut flush_acks,
1083 &mut shutdown_ack,
1084 &mut should_shutdown,
1085 );
1086
1087 let should_send = !pending.is_empty()
1088 && (should_shutdown
1089 || !flush_acks.is_empty()
1090 || shutdown_ack.is_some()
1091 || pending.len() >= config.batch_size
1092 || due);
1093
1094 if should_send {
1095 last_result = send_pending(
1096 &config,
1097 &http,
1098 &mut pending,
1099 &send_seconds_per_log,
1100 &mut last_request_started_at,
1101 );
1102 first_pending_at = if pending.is_empty() {
1103 None
1104 } else {
1105 Some(Instant::now())
1106 };
1107 } else if pending.is_empty() {
1108 first_pending_at = None;
1109 last_result = WorkerResult::ok(0, None, None);
1110 }
1111
1112 for ack in flush_acks {
1113 let _ = ack.send(last_result.clone());
1114 }
1115
1116 if let Some(ack) = shutdown_ack {
1117 let _ = ack.send(last_result.clone());
1118 break;
1119 }
1120
1121 if should_shutdown {
1122 break;
1123 }
1124 }
1125}
1126
1127fn receive_worker_message(
1128 receiver: &Receiver<WorkerMessage>,
1129 timeout: Duration,
1130) -> Result<Option<WorkerMessage>, ()> {
1131 match receiver.recv_timeout(timeout) {
1132 Ok(message) => Ok(Some(message)),
1133 Err(RecvTimeoutError::Timeout) => Ok(None),
1134 Err(RecvTimeoutError::Disconnected) => Err(()),
1135 }
1136}
1137
1138fn drain_worker_messages(
1139 receiver: &Receiver<WorkerMessage>,
1140 pending: &mut Vec<LogPayload>,
1141 first_pending_at: &mut Option<Instant>,
1142 flush_acks: &mut Vec<mpsc::Sender<WorkerResult>>,
1143 shutdown_ack: &mut Option<mpsc::Sender<WorkerResult>>,
1144 should_shutdown: &mut bool,
1145) {
1146 loop {
1147 match receiver.try_recv() {
1148 Ok(message) => handle_worker_message(
1149 message,
1150 pending,
1151 first_pending_at,
1152 flush_acks,
1153 shutdown_ack,
1154 should_shutdown,
1155 ),
1156 Err(TryRecvError::Empty) => break,
1157 Err(TryRecvError::Disconnected) => {
1158 *should_shutdown = true;
1159 break;
1160 }
1161 }
1162 }
1163}
1164
1165fn handle_worker_message(
1166 message: WorkerMessage,
1167 pending: &mut Vec<LogPayload>,
1168 first_pending_at: &mut Option<Instant>,
1169 flush_acks: &mut Vec<mpsc::Sender<WorkerResult>>,
1170 shutdown_ack: &mut Option<mpsc::Sender<WorkerResult>>,
1171 should_shutdown: &mut bool,
1172) {
1173 match message {
1174 WorkerMessage::Log(payload) => {
1175 if pending.is_empty() {
1176 *first_pending_at = Some(Instant::now());
1177 }
1178 pending.push(payload);
1179 }
1180 WorkerMessage::Flush(ack) => flush_acks.push(ack),
1181 WorkerMessage::Shutdown(ack) => {
1182 *shutdown_ack = Some(ack);
1183 *should_shutdown = true;
1184 }
1185 }
1186}
1187
1188fn send_pending(
1189 config: &WorkerConfig,
1190 http: &HttpClient,
1191 pending: &mut Vec<LogPayload>,
1192 send_seconds_per_log: &Arc<Mutex<f64>>,
1193 last_request_started_at: &mut Option<Instant>,
1194) -> WorkerResult {
1195 let mut accepted_total = 0usize;
1196 let mut status_code = None;
1197 let mut response = None;
1198
1199 while !pending.is_empty() {
1200 let batch_len = config.batch_size.min(pending.len()).max(1);
1201 wait_for_request_interval(config.min_request_interval, *last_request_started_at);
1202 let started_at = Instant::now();
1203 *last_request_started_at = Some(started_at);
1204
1205 let result = send_batch(config, http, &pending[..batch_len]);
1206 let elapsed = started_at.elapsed();
1207 status_code = result.status_code;
1208 response = result.response.clone();
1209
1210 if !result.ok {
1211 return WorkerResult::failed(
1212 accepted_total,
1213 status_code,
1214 response,
1215 result
1216 .error
1217 .unwrap_or_else(|| "Master Log batch send failed".to_string()),
1218 );
1219 }
1220
1221 let accepted = result.accepted;
1222 update_send_seconds_per_log(send_seconds_per_log, elapsed, accepted);
1223 accepted_total += accepted;
1224 pending.drain(..batch_len);
1225 }
1226
1227 WorkerResult::ok(accepted_total, status_code, response)
1228}
1229
1230#[derive(Serialize)]
1231struct BatchRequest<'a> {
1232 logs: &'a [LogPayload],
1233}
1234
1235fn send_batch(config: &WorkerConfig, http: &HttpClient, payloads: &[LogPayload]) -> WorkerResult {
1236 let url = match batch_logs_url(&config.endpoint) {
1237 Ok(url) => url,
1238 Err(error) => return WorkerResult::failed(0, None, None, error.to_string()),
1239 };
1240
1241 match post_json(
1242 http,
1243 &url,
1244 &config.api_key,
1245 &BatchRequest { logs: payloads },
1246 ) {
1247 Ok(result) => WorkerResult::ok(
1248 result.accepted.unwrap_or(payloads.len()),
1249 result.status_code,
1250 result.response,
1251 ),
1252 Err(error) => WorkerResult::failed(0, None, None, error.to_string()),
1253 }
1254}
1255
1256fn fail_all_worker_messages(receiver: Receiver<WorkerMessage>, result: WorkerResult) {
1257 while let Ok(message) = receiver.recv_timeout(duration_from_seconds(0.05)) {
1258 match message {
1259 WorkerMessage::Flush(ack) | WorkerMessage::Shutdown(ack) => {
1260 let _ = ack.send(result.clone());
1261 }
1262 WorkerMessage::Log(_) => {}
1263 }
1264 }
1265}
1266
1267fn worker_wait_timeout(first_pending_at: Option<Instant>, flush_interval: Duration) -> Duration {
1268 let Some(first_pending_at) = first_pending_at else {
1269 return flush_interval;
1270 };
1271
1272 flush_interval
1273 .checked_sub(first_pending_at.elapsed())
1274 .unwrap_or(Duration::ZERO)
1275}
1276
1277fn wait_for_request_interval(
1278 min_request_interval: Duration,
1279 last_request_started_at: Option<Instant>,
1280) {
1281 let sleep = request_interval_sleep(
1282 min_request_interval,
1283 last_request_started_at,
1284 Instant::now(),
1285 );
1286 if !sleep.is_zero() {
1287 thread::sleep(sleep);
1288 }
1289}
1290
1291fn request_interval_sleep(
1292 min_request_interval: Duration,
1293 last_request_started_at: Option<Instant>,
1294 now: Instant,
1295) -> Duration {
1296 if min_request_interval.is_zero() {
1297 return Duration::ZERO;
1298 }
1299
1300 let Some(last_request_started_at) = last_request_started_at else {
1301 return Duration::ZERO;
1302 };
1303
1304 min_request_interval
1305 .checked_sub(now.saturating_duration_since(last_request_started_at))
1306 .unwrap_or(Duration::ZERO)
1307}
1308
1309fn update_send_seconds_per_log(
1310 send_seconds_per_log: &Arc<Mutex<f64>>,
1311 elapsed: Duration,
1312 accepted_count: usize,
1313) {
1314 if accepted_count == 0 {
1315 return;
1316 }
1317
1318 let sample = elapsed.as_secs_f64().max(0.0) / accepted_count as f64;
1319 if let Ok(mut current) = send_seconds_per_log.lock() {
1320 if *current <= 0.0 {
1321 *current = sample;
1322 } else {
1323 *current = *current * (1.0 - DEFAULT_SEND_RATE_SMOOTHING)
1324 + sample * DEFAULT_SEND_RATE_SMOOTHING;
1325 }
1326 }
1327}
1328
1329fn send_command_with_timeout(
1330 sender: &SyncSender<WorkerMessage>,
1331 mut command: WorkerMessage,
1332 timeout: Duration,
1333 drop_when_full: bool,
1334) -> Result<(), MasterLogError> {
1335 let deadline = Instant::now() + timeout;
1336
1337 loop {
1338 match sender.try_send(command) {
1339 Ok(()) => return Ok(()),
1340 Err(TrySendError::Disconnected(_)) => return Err(MasterLogError::WorkerUnavailable),
1341 Err(TrySendError::Full(returned)) => {
1342 if drop_when_full || Instant::now() >= deadline {
1343 return Err(MasterLogError::QueueFull);
1344 }
1345 command = returned;
1346 thread::sleep(duration_from_seconds(0.005).min(timeout));
1347 }
1348 }
1349 }
1350}
1351
1352fn build_http_client(timeout: Duration) -> Result<HttpClient, MasterLogError> {
1353 HttpClient::builder()
1354 .timeout(timeout)
1355 .build()
1356 .map_err(|error| MasterLogError::Http(error.to_string()))
1357}
1358
1359fn post_json<T: Serialize + ?Sized>(
1360 http: &HttpClient,
1361 url: &str,
1362 api_key: &str,
1363 payload: &T,
1364) -> Result<MasterLogResult, MasterLogError> {
1365 let response = http
1366 .post(url)
1367 .bearer_auth(api_key)
1368 .header("Accept", "application/json")
1369 .header(
1370 "User-Agent",
1371 format!("master-log-client-rust/{LIBRARY_VERSION}"),
1372 )
1373 .json(payload)
1374 .send()
1375 .map_err(|error| MasterLogError::Http(error.to_string()))?;
1376
1377 let status_code = response.status().as_u16();
1378 let success = response.status().is_success();
1379 let body = response
1380 .text()
1381 .map_err(|error| MasterLogError::Http(error.to_string()))?;
1382 let parsed = if body.trim().is_empty() {
1383 Value::Object(Map::new())
1384 } else {
1385 serde_json::from_str::<Value>(&body).unwrap_or_else(|_| json!({ "error": body }))
1386 };
1387
1388 if !success {
1389 return Err(MasterLogError::Http(format!(
1390 "Master Log returned HTTP {status_code}: {parsed}"
1391 )));
1392 }
1393
1394 let event_id = parsed
1395 .get("events")
1396 .and_then(Value::as_array)
1397 .and_then(|events| events.first())
1398 .and_then(|event| event.get("id"))
1399 .and_then(Value::as_str)
1400 .map(str::to_string);
1401 let accepted = parsed
1402 .get("accepted")
1403 .and_then(Value::as_u64)
1404 .and_then(|value| usize::try_from(value).ok());
1405
1406 Ok(MasterLogResult {
1407 ok: true,
1408 status_code: Some(status_code),
1409 event_id,
1410 error: None,
1411 response: Some(parsed),
1412 queued: false,
1413 accepted,
1414 })
1415}
1416
1417fn default_title(title: Option<&str>, body: &str) -> String {
1418 if let Some(title) = title.map(str::trim).filter(|title| !title.is_empty()) {
1419 return title.to_string();
1420 }
1421
1422 let first_line = body
1423 .trim()
1424 .lines()
1425 .next()
1426 .filter(|line| !line.is_empty())
1427 .unwrap_or("Rust log event");
1428 first_line.chars().take(120).collect()
1429}
1430
1431fn normalize_tags(tags: Vec<String>) -> Vec<String> {
1432 tags.into_iter()
1433 .flat_map(|tag| {
1434 tag.split(',')
1435 .map(str::trim)
1436 .filter(|tag| !tag.is_empty())
1437 .map(str::to_string)
1438 .collect::<Vec<_>>()
1439 })
1440 .collect::<BTreeSet<_>>()
1441 .into_iter()
1442 .collect()
1443}
1444
1445fn merge_metadata(metadata: Value) -> Value {
1446 let mut object = match metadata {
1447 Value::Object(object) => object,
1448 value => {
1449 let mut object = Map::new();
1450 object.insert("value".to_string(), value);
1451 object
1452 }
1453 };
1454
1455 let client_metadata = rust_client_metadata();
1456 if object.contains_key("rust_client") {
1457 object.insert("_rust_client".to_string(), client_metadata);
1458 } else {
1459 object.insert("rust_client".to_string(), client_metadata);
1460 }
1461
1462 Value::Object(object)
1463}
1464
1465fn rust_client_metadata() -> Value {
1466 let argv0 = env::args().next().unwrap_or_default();
1467 let process_name = process_name(&argv0);
1468 let hostname = hostname::get()
1469 .ok()
1470 .and_then(|value| value.into_string().ok())
1471 .unwrap_or_else(|| "unknown".to_string());
1472 let cwd = env::current_dir()
1473 .unwrap_or_else(|_| PathBuf::from(""))
1474 .display()
1475 .to_string();
1476
1477 json!({
1478 "hostname": hostname,
1479 "pid": std::process::id(),
1480 "process_name": process_name,
1481 "argv0": argv0,
1482 "cwd": cwd,
1483 "library": "master-log-client",
1484 "library_version": LIBRARY_VERSION,
1485 })
1486}
1487
1488fn process_name(argv0: &str) -> String {
1489 if !argv0.trim().is_empty() {
1490 return PathBuf::from(argv0)
1491 .file_name()
1492 .and_then(|value| value.to_str())
1493 .unwrap_or("rust")
1494 .to_string();
1495 }
1496
1497 env::current_exe()
1498 .ok()
1499 .and_then(|path| path.file_name().map(|value| value.to_owned()))
1500 .and_then(|value| value.into_string().ok())
1501 .unwrap_or_else(|| "rust".to_string())
1502}
1503
1504fn env_non_empty(name: &str) -> Option<String> {
1505 env::var(name)
1506 .ok()
1507 .map(|value| value.trim().to_string())
1508 .filter(|value| !value.is_empty())
1509}
1510
1511fn env_bool(name: &str) -> Option<bool> {
1512 match env::var(name).ok()?.trim().to_ascii_lowercase().as_str() {
1513 "1" | "true" | "yes" | "on" => Some(true),
1514 "0" | "false" | "no" | "off" => Some(false),
1515 _ => None,
1516 }
1517}
1518
1519fn env_f64(name: &str) -> Option<f64> {
1520 env::var(name).ok()?.trim().parse::<f64>().ok()
1521}
1522
1523fn env_usize(name: &str) -> Option<usize> {
1524 env::var(name).ok()?.trim().parse::<usize>().ok()
1525}
1526
1527fn env_duration(name: &str) -> Option<Duration> {
1528 env_f64(name).map(duration_from_seconds)
1529}
1530
1531fn positive_usize(value: Option<usize>, default: usize) -> usize {
1532 value.filter(|value| *value > 0).unwrap_or(default)
1533}
1534
1535fn non_negative_usize(value: Option<usize>, default: usize) -> usize {
1536 value.unwrap_or(default)
1537}
1538
1539fn positive_duration(value: Option<Duration>, default: Duration) -> Duration {
1540 value.filter(|value| !value.is_zero()).unwrap_or(default)
1541}
1542
1543fn non_negative_duration(value: Option<Duration>, default: Duration) -> Duration {
1544 value.unwrap_or(default)
1545}
1546
1547fn non_negative_f64(value: Option<f64>, default: f64) -> f64 {
1548 value.filter(|value| *value >= 0.0).unwrap_or(default)
1549}
1550
1551fn duration_from_seconds(seconds: f64) -> Duration {
1552 Duration::from_secs_f64(seconds.max(0.0))
1553}
1554
1555#[cfg(test)]
1556mod tests {
1557 use super::*;
1558
1559 #[test]
1560 fn logs_url_accepts_root_api_and_logs_endpoint() {
1561 assert_eq!(
1562 logs_url("http://localhost:8000").unwrap(),
1563 "http://localhost:8000/api/v1/logs"
1564 );
1565 assert_eq!(
1566 logs_url("http://localhost:8000/api/v1").unwrap(),
1567 "http://localhost:8000/api/v1/logs"
1568 );
1569 assert_eq!(
1570 logs_url("http://localhost:8000/api/v1/logs").unwrap(),
1571 "http://localhost:8000/api/v1/logs"
1572 );
1573 assert_eq!(
1574 logs_url("http://localhost:8000/api/v1/logs/batch").unwrap(),
1575 "http://localhost:8000/api/v1/logs"
1576 );
1577 }
1578
1579 #[test]
1580 fn batch_logs_url_accepts_root_api_logs_and_batch_endpoint() {
1581 assert_eq!(
1582 batch_logs_url("http://localhost:8000").unwrap(),
1583 "http://localhost:8000/api/v1/logs/batch"
1584 );
1585 assert_eq!(
1586 batch_logs_url("http://localhost:8000/api/v1").unwrap(),
1587 "http://localhost:8000/api/v1/logs/batch"
1588 );
1589 assert_eq!(
1590 batch_logs_url("http://localhost:8000/api/v1/logs").unwrap(),
1591 "http://localhost:8000/api/v1/logs/batch"
1592 );
1593 assert_eq!(
1594 batch_logs_url("http://localhost:8000/api/v1/logs/batch").unwrap(),
1595 "http://localhost:8000/api/v1/logs/batch"
1596 );
1597 }
1598
1599 #[test]
1600 fn log_payload_is_enriched_and_normalizes_tags() {
1601 let payload = LogEntry::new("hello M31")
1602 .severity(Severity::Warn)
1603 .tags(["ccd, telescope", "ccd"])
1604 .metadata(json!({ "frame": 42 }))
1605 .into_payload();
1606
1607 assert_eq!(payload.severity, Severity::Warn);
1608 assert_eq!(payload.body, "hello M31");
1609 assert_eq!(payload.title, "hello M31");
1610 assert_eq!(payload.tags, vec!["ccd", "telescope"]);
1611 assert_eq!(payload.metadata["frame"], 42);
1612 assert!(payload.metadata.get("rust_client").is_some());
1613 }
1614
1615 #[test]
1616 fn request_interval_sleep_uses_last_request_start() {
1617 let now = Instant::now();
1618 assert_eq!(
1619 request_interval_sleep(Duration::from_millis(250), None, now),
1620 Duration::ZERO
1621 );
1622 assert_eq!(
1623 request_interval_sleep(Duration::ZERO, Some(now), now),
1624 Duration::ZERO
1625 );
1626 assert_eq!(
1627 request_interval_sleep(
1628 Duration::from_millis(250),
1629 Some(now - Duration::from_millis(100)),
1630 now
1631 ),
1632 Duration::from_millis(150)
1633 );
1634 assert_eq!(
1635 request_interval_sleep(
1636 Duration::from_millis(250),
1637 Some(now - Duration::from_millis(1000)),
1638 now
1639 ),
1640 Duration::ZERO
1641 );
1642 }
1643}