1#![expect(
7 clippy::missing_errors_doc,
8 reason = "public facade methods already have behavior documented centrally in the workspace docs, and repeating per-method Errors sections here would add low-signal boilerplate"
9)]
10#![expect(
11 clippy::must_use_candidate,
12 reason = "the facade intentionally avoids pervasive must_use boilerplate on constructors and lightweight accessors where the return types are already obvious from call sites"
13)]
14#![expect(
15 clippy::return_self_not_must_use,
16 reason = "builder-style chaining methods in this facade predate pedantic lint adoption and remain intentionally lightweight"
17)]
18
19pub mod constants;
20pub mod error_codes;
21
22mod builder;
23mod follow;
24mod health;
25mod jsonl_reader;
26mod query;
27
28use std::fs::{self, OpenOptions};
29use std::io::Write;
30use std::path::{Path, PathBuf};
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::{Arc, Mutex};
33use std::time::{Duration, SystemTime};
34
35use crate::health::QueryHealthTracker;
36#[doc(inline)]
37pub use builder::LoggerBuilder;
38#[doc(inline)]
39pub use follow::LogFollowSession;
40#[doc(inline)]
41pub use jsonl_reader::JsonlLogReader;
42#[doc(inline)]
43pub use sc_observability_types::{
44 ActionName, ErrorCode, EventError, Level, LogEvent, LogQuery, LogSnapshot, LoggingHealthReport,
45 LoggingHealthState, OBSERVATION_ENVELOPE_VERSION, OutcomeLabel, ProcessIdentity, SchemaVersion,
46 ServiceName, SinkHealth, SinkHealthState, TargetCategory, Timestamp,
47};
48use sc_observability_types::{
49 Diagnostic, DiagnosticInfo, DiagnosticSummary, ErrorContext, FlushError, InitError,
50 LevelFilter, LogSinkError, ProcessIdentityPolicy, QueryError, QueryHealthState, Remediation,
51 ShutdownError, SinkName,
52};
53use serde_json::Value;
54
55#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct RotationPolicy {
58 pub max_bytes: u64,
60 pub max_files: u32,
62}
63
64impl Default for RotationPolicy {
65 fn default() -> Self {
66 Self {
67 max_bytes: constants::DEFAULT_ROTATION_MAX_BYTES,
68 max_files: constants::DEFAULT_ROTATION_MAX_FILES,
69 }
70 }
71}
72
73#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub struct RetentionPolicy {
76 pub max_age_days: u32,
78}
79
80impl Default for RetentionPolicy {
81 fn default() -> Self {
82 Self {
83 max_age_days: constants::DEFAULT_RETENTION_MAX_AGE_DAYS,
84 }
85 }
86}
87
88pub trait Redactor: Send + Sync {
90 fn redact(&self, key: &str, value: &mut Value);
92}
93
94#[derive(Default)]
96pub struct RedactionPolicy {
97 pub denylist_keys: Vec<String>,
99 pub redact_bearer_tokens: bool,
101 pub custom_redactors: Vec<Box<dyn Redactor>>,
103}
104
105impl std::fmt::Debug for RedactionPolicy {
106 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107 f.debug_struct("RedactionPolicy")
108 .field("denylist_keys", &self.denylist_keys)
109 .field("redact_bearer_tokens", &self.redact_bearer_tokens)
110 .field("custom_redactors", &self.custom_redactors.len())
111 .finish()
112 }
113}
114
115pub trait LogFilter: Send + Sync {
117 fn accepts(&self, event: &LogEvent) -> bool;
119}
120
121pub trait LogSink: Send + Sync {
123 fn write(&self, event: &LogEvent) -> Result<(), LogSinkError>;
125
126 fn flush(&self) -> Result<(), LogSinkError> {
128 Ok(())
129 }
130
131 fn health(&self) -> SinkHealth;
133}
134
135#[derive(Clone)]
137#[expect(
138 missing_debug_implementations,
139 reason = "registration stores trait-object sinks and filters, so derived Debug would not provide a meaningful stable contract"
140)]
141pub struct SinkRegistration {
142 pub(crate) sink: Arc<dyn LogSink>,
144 pub(crate) filter: Option<Arc<dyn LogFilter>>,
146}
147
148impl SinkRegistration {
149 pub fn new(sink: Arc<dyn LogSink>) -> Self {
151 Self { sink, filter: None }
152 }
153
154 pub fn with_filter(mut self, filter: Arc<dyn LogFilter>) -> Self {
156 self.filter = Some(filter);
157 self
158 }
159}
160
161#[cfg(feature = "fault-injection")]
162#[derive(Clone, Default)]
169#[expect(
170 missing_debug_implementations,
171 reason = "fault injector state is a small validation-only mutex wrapper without a useful stable Debug contract"
172)]
173pub struct RetainedSinkFaultInjector {
174 forced_state: Arc<Mutex<Option<SinkHealthState>>>,
175}
176
177#[cfg(feature = "fault-injection")]
178impl RetainedSinkFaultInjector {
179 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn wrap(&self, sink: Arc<dyn LogSink>) -> Arc<dyn LogSink> {
190 Arc::new(FaultInjectingSink {
191 inner: sink,
192 forced_state: self.forced_state.clone(),
193 })
194 }
195
196 pub fn force_degraded(&self) {
198 self.set_state(SinkHealthState::DegradedDropping);
199 }
200
201 pub fn force_unavailable(&self) {
203 self.set_state(SinkHealthState::Unavailable);
204 }
205
206 pub fn clear(&self) {
213 *self
214 .forced_state
215 .lock()
216 .expect("retained sink fault state poisoned") = None;
217 }
218
219 fn set_state(&self, state: SinkHealthState) {
220 *self
221 .forced_state
222 .lock()
223 .expect("retained sink fault state poisoned") = Some(state);
224 }
225}
226
227#[cfg(feature = "fault-injection")]
228struct FaultInjectingSink {
229 inner: Arc<dyn LogSink>,
230 forced_state: Arc<Mutex<Option<SinkHealthState>>>,
231}
232
233#[cfg(feature = "fault-injection")]
234impl FaultInjectingSink {
235 fn current_state(&self) -> Option<SinkHealthState> {
236 *self
237 .forced_state
238 .lock()
239 .expect("retained sink fault state poisoned")
240 }
241}
242
243#[derive(Debug)]
245pub struct LoggerConfig {
246 pub service_name: ServiceName,
248 pub log_root: PathBuf,
250 pub level: LevelFilter,
252 pub queue_capacity: usize,
254 pub rotation: RotationPolicy,
256 pub retention: RetentionPolicy,
258 pub redaction: RedactionPolicy,
260 pub process_identity: ProcessIdentityPolicy,
262 pub enable_file_sink: bool,
264 pub enable_console_sink: bool,
266}
267
268impl LoggerConfig {
269 pub fn default_for(service_name: ServiceName, log_root: PathBuf) -> Self {
277 let resolved_log_root = if log_root.as_os_str().is_empty() {
278 std::env::var(constants::SC_LOG_ROOT_ENV_VAR)
279 .ok()
280 .map(PathBuf::from)
281 .filter(|path| !path.as_os_str().is_empty())
282 .unwrap_or(log_root)
283 } else {
284 log_root
285 };
286 Self {
287 service_name,
288 log_root: resolved_log_root,
289 level: LevelFilter::Info,
290 queue_capacity: constants::DEFAULT_LOG_QUEUE_CAPACITY,
291 rotation: RotationPolicy::default(),
292 retention: RetentionPolicy::default(),
293 redaction: RedactionPolicy {
294 redact_bearer_tokens: true,
295 ..RedactionPolicy::default()
296 },
297 process_identity: ProcessIdentityPolicy::Auto,
298 enable_file_sink: constants::DEFAULT_ENABLE_FILE_SINK,
299 enable_console_sink: constants::DEFAULT_ENABLE_CONSOLE_SINK,
300 }
301 }
302}
303
304struct LoggerRuntime {
305 dropped_events_total: AtomicU64,
306 flush_errors_total: AtomicU64,
307 last_error: Mutex<Option<DiagnosticSummary>>,
310 query_health: Arc<QueryHealthTracker>,
311}
312
313impl LoggerRuntime {
314 fn new(query_available: bool) -> Self {
315 Self {
316 dropped_events_total: AtomicU64::new(0),
317 flush_errors_total: AtomicU64::new(0),
318 last_error: Mutex::new(None),
319 query_health: Arc::new(QueryHealthTracker::new(if query_available {
320 QueryHealthState::Healthy
321 } else {
322 QueryHealthState::Unavailable
323 })),
324 }
325 }
326}
327
328#[expect(
330 missing_debug_implementations,
331 reason = "logger owns runtime handles and trait-object sinks whose internal state is not a stable public debug contract"
332)]
333pub struct Logger {
334 config: LoggerConfig,
335 sinks: Vec<SinkRegistration>,
336 shutdown: Arc<AtomicBool>,
337 runtime: LoggerRuntime,
338}
339
340impl Logger {
341 pub fn builder(config: LoggerConfig) -> Result<LoggerBuilder, InitError> {
343 LoggerBuilder::new(config)
344 }
345
346 pub fn new(config: LoggerConfig) -> Result<Self, InitError> {
348 Ok(LoggerBuilder::new(config)?.build())
349 }
350
351 pub fn emit(&self, event: LogEvent) -> Result<(), EventError> {
353 if self.shutdown.load(Ordering::SeqCst) {
354 return Err(EventError(Box::new(ErrorContext::new(
355 error_codes::LOGGER_SHUTDOWN,
356 "logger is shut down",
357 Remediation::not_recoverable("create a new logger before emitting"),
358 ))));
359 }
360
361 validate_event(&event, &self.config.service_name)?;
362 let redacted = self.redact_event(event);
363
364 for registration in &self.sinks {
365 if registration
366 .filter
367 .as_ref()
368 .is_some_and(|filter| !filter.accepts(&redacted))
369 {
370 continue;
371 }
372
373 if let Err(err) = registration.sink.write(&redacted) {
374 self.record_sink_failure(&err);
375 }
376 }
377
378 Ok(())
379 }
380
381 pub fn flush(&self) -> Result<(), FlushError> {
392 if self.shutdown.load(Ordering::SeqCst) {
393 return Ok(());
394 }
395
396 self.flush_registered_sinks();
397 Ok(())
398 }
399
400 pub fn query(&self, query: &LogQuery) -> Result<LogSnapshot, QueryError> {
407 let reader = self.query_reader()?;
408 let result = reader.query(query);
409 self.runtime.query_health.record_result(&result);
410 result
411 }
412
413 pub fn follow(&self, query: LogQuery) -> Result<LogFollowSession, QueryError> {
420 let active_log_path = self.ensure_query_available()?;
421 let result = LogFollowSession::with_health(
422 active_log_path,
423 query,
424 self.runtime.query_health.clone(),
425 Some(self.shutdown.clone()),
426 );
427 self.runtime.query_health.record_result(&result);
428 result
429 }
430
431 fn flush_registered_sinks(&self) {
432 for registration in &self.sinks {
433 if let Err(err) = registration.sink.flush() {
434 self.record_flush_failure(&err);
435 }
436 }
437 }
438
439 pub fn shutdown(&self) -> Result<(), ShutdownError> {
447 if self.shutdown.swap(true, Ordering::SeqCst) {
448 return Ok(());
449 }
450
451 self.flush_registered_sinks();
452 self.runtime.query_health.mark_unavailable(None);
453 Ok(())
454 }
455
456 pub fn health(&self) -> LoggingHealthReport {
462 let sink_statuses: Vec<SinkHealth> =
463 self.sinks.iter().map(|entry| entry.sink.health()).collect();
464 LoggingHealthReport {
465 state: aggregate_logging_health_state(&sink_statuses),
466 dropped_events_total: self.runtime.dropped_events_total.load(Ordering::SeqCst),
467 flush_errors_total: self.runtime.flush_errors_total.load(Ordering::SeqCst),
468 active_log_path: default_log_path(&self.config.log_root, &self.config.service_name),
469 sink_statuses,
470 query: Some(self.runtime.query_health.snapshot()),
471 last_error: self
472 .runtime
473 .last_error
474 .lock()
475 .expect("logger last_error poisoned")
476 .clone(),
477 }
478 }
479
480 fn redact_event(&self, mut event: LogEvent) -> LogEvent {
481 if self.config.redaction.redact_bearer_tokens
482 && let Some(message) = event.message.as_mut()
483 {
484 *message = redact_bearer_token_text(message);
485 }
486
487 for (key, value) in &mut event.fields {
488 if self
489 .config
490 .redaction
491 .denylist_keys
492 .iter()
493 .any(|deny| deny == key)
494 {
495 *value = Value::String(constants::REDACTED_VALUE.to_string());
496 }
497 if self.config.redaction.redact_bearer_tokens {
498 redact_string_value(value);
499 }
500 for redactor in &self.config.redaction.custom_redactors {
501 redactor.redact(key, value);
502 }
503 }
504
505 event
506 }
507
508 fn record_sink_failure(&self, error: &LogSinkError) {
509 self.runtime
510 .dropped_events_total
511 .fetch_add(1, Ordering::SeqCst);
512 *self
513 .runtime
514 .last_error
515 .lock()
516 .expect("logger last_error poisoned") =
517 Some(DiagnosticSummary::from(error.diagnostic()));
518 }
519
520 fn record_flush_failure(&self, error: &LogSinkError) {
521 self.runtime
522 .flush_errors_total
523 .fetch_add(1, Ordering::SeqCst);
524 *self
525 .runtime
526 .last_error
527 .lock()
528 .expect("logger last_error poisoned") =
529 Some(DiagnosticSummary::from(error.diagnostic()));
530 }
531
532 fn query_reader(&self) -> Result<JsonlLogReader, QueryError> {
533 self.ensure_query_available().map(JsonlLogReader::new)
534 }
535
536 fn ensure_query_available(&self) -> Result<PathBuf, QueryError> {
537 if self.shutdown.load(Ordering::SeqCst) {
538 let error = query::shutdown_error();
539 self.runtime.query_health.record_error(&error);
540 return Err(error);
541 }
542
543 if !self.config.enable_file_sink {
544 let error = query::unavailable_error(
545 "logger query/follow requires the built-in JSONL file sink to be enabled",
546 );
547 self.runtime.query_health.record_error(&error);
548 return Err(error);
549 }
550
551 Ok(default_log_path(
552 &self.config.log_root,
553 &self.config.service_name,
554 ))
555 }
556}
557
558#[expect(
560 missing_debug_implementations,
561 reason = "file-sink internals include mutex-protected runtime state that is intentionally not exposed through a public Debug contract"
562)]
563pub struct JsonlFileSink {
564 path: PathBuf,
565 rotation: RotationPolicy,
566 retention: RetentionPolicy,
567 health: Mutex<SinkHealth>,
570}
571
572impl JsonlFileSink {
573 pub fn new(path: PathBuf, rotation: RotationPolicy, retention: RetentionPolicy) -> Self {
580 Self {
581 path,
582 rotation,
583 retention,
584 health: Mutex::new(SinkHealth {
585 name: SinkName::new(constants::JSONL_FILE_SINK_NAME)
586 .expect("jsonl sink constant is valid"),
587 state: SinkHealthState::Healthy,
588 last_error: None,
589 }),
590 }
591 }
592
593 pub fn path(&self) -> &Path {
595 &self.path
596 }
597
598 #[expect(
599 clippy::unnecessary_wraps,
600 reason = "the helper preserves a Result-shaped internal API so rotation checks can grow I/O failure propagation without reshaping caller control flow"
601 )]
602 fn rotate_if_needed(&self, incoming_len: u64) -> Result<(), LogSinkError> {
603 if let Ok(metadata) = fs::metadata(&self.path)
604 && metadata.len().saturating_add(incoming_len) > self.rotation.max_bytes
605 {
606 for idx in (1..self.rotation.max_files).rev() {
607 let src = self.rotated_path(idx);
608 let dest = self.rotated_path(idx + 1);
609 let _ = rename_if_present(&src, &dest);
612 }
613 let rotated = self.rotated_path(1);
614 let _ = rename_if_present(&self.path, &rotated);
617 }
618
619 self.prune_old_files();
620 Ok(())
621 }
622
623 fn rotated_path(&self, index: u32) -> PathBuf {
624 rotated_log_path(&self.path, index)
625 }
626
627 fn prune_old_files(&self) {
628 let Some(parent) = self.path.parent() else {
629 return;
630 };
631
632 let Ok(entries) = fs::read_dir(parent) else {
633 return;
634 };
635 let retention_cutoff = SystemTime::now()
636 - Duration::from_secs(u64::from(self.retention.max_age_days) * constants::SECS_PER_DAY);
637
638 for entry in entries.flatten() {
639 let path = entry.path();
640 let Some(file_name) = path.file_name().and_then(|value| value.to_str()) else {
641 continue;
642 };
643
644 let active_name = self
645 .path
646 .file_name()
647 .and_then(|value| value.to_str())
648 .unwrap_or_default();
649
650 if !file_name.starts_with(active_name) || file_name == active_name {
651 continue;
652 }
653
654 if let Ok(metadata) = entry.metadata()
655 && let Ok(modified) = metadata.modified()
656 && modified < retention_cutoff
657 {
658 let _ = fs::remove_file(path);
661 }
662 }
663 }
664
665 fn mark_failure<E>(&self, error: E) -> LogSinkError
666 where
667 E: std::error::Error + Send + Sync + 'static,
668 {
669 let message = error.to_string();
670 let diagnostic = diagnostic_for_sink_failure(message.clone());
671 let mut health = self.health.lock().expect("file sink health poisoned");
672 health.state = SinkHealthState::DegradedDropping;
673 health.last_error = Some(DiagnosticSummary::from(&diagnostic));
674 LogSinkError(Box::new(
675 ErrorContext::new(
676 error_codes::LOGGER_SINK_WRITE_FAILED,
677 "jsonl file sink write failed",
678 Remediation::not_recoverable(
679 "file sink write failure handling is owned by the logger runtime",
680 ),
681 )
682 .cause(message)
683 .source(Box::new(error)),
684 ))
685 }
686}
687
688impl LogSink for JsonlFileSink {
689 fn write(&self, event: &LogEvent) -> Result<(), LogSinkError> {
690 if let Some(parent) = self.path.parent() {
691 fs::create_dir_all(parent).map_err(|err| self.mark_failure(err))?;
692 }
693
694 let mut line = serde_json::to_vec(event).map_err(|err| self.mark_failure(err))?;
695 line.push(b'\n');
696 self.rotate_if_needed(line.len() as u64)?;
697
698 let mut file = OpenOptions::new()
699 .create(true)
700 .append(true)
701 .open(&self.path)
702 .map_err(|err| self.mark_failure(err))?;
703 file.write_all(&line)
704 .and_then(|()| file.flush())
705 .map_err(|err| self.mark_failure(err))?;
706
707 let mut health = self.health.lock().expect("file sink health poisoned");
708 health.state = SinkHealthState::Healthy;
709 Ok(())
710 }
711
712 fn health(&self) -> SinkHealth {
713 self.health
714 .lock()
715 .expect("file sink health poisoned")
716 .clone()
717 }
718}
719
720trait ConsoleWriter: Send + Sync {
721 fn write_line(&self, line: &str) -> std::io::Result<()>;
722}
723
724struct StdoutConsoleWriter;
725
726impl ConsoleWriter for StdoutConsoleWriter {
727 fn write_line(&self, line: &str) -> std::io::Result<()> {
728 let mut stdout = std::io::stdout().lock();
729 stdout.write_all(line.as_bytes())?;
730 stdout.write_all(b"\n")?;
731 stdout.flush()
732 }
733}
734
735struct StderrConsoleWriter;
736
737impl ConsoleWriter for StderrConsoleWriter {
738 fn write_line(&self, line: &str) -> std::io::Result<()> {
739 let mut stderr = std::io::stderr().lock();
740 stderr.write_all(line.as_bytes())?;
741 stderr.write_all(b"\n")?;
742 stderr.flush()
743 }
744}
745
746#[expect(
749 missing_debug_implementations,
750 reason = "console sink owns a trait-object writer and sink health state, so a derived Debug impl would not be stable or useful"
751)]
752pub struct ConsoleSink {
753 writer: Box<dyn ConsoleWriter>,
754 health: Mutex<SinkHealth>,
757}
758
759impl ConsoleSink {
760 pub fn stdout() -> Self {
762 Self::from_writer(Box::new(StdoutConsoleWriter))
763 }
764
765 pub fn stderr() -> Self {
767 Self::from_writer(Box::new(StderrConsoleWriter))
768 }
769
770 pub(crate) fn from_writer(writer: Box<dyn ConsoleWriter>) -> Self {
771 Self {
772 writer,
773 health: Mutex::new(SinkHealth {
774 name: SinkName::new(constants::CONSOLE_SINK_NAME)
775 .expect("console sink constant is valid"),
776 state: SinkHealthState::Healthy,
777 last_error: None,
778 }),
779 }
780 }
781
782 fn format_line(event: &LogEvent) -> String {
783 let level = match event.level {
784 Level::Trace => "TRACE",
785 Level::Debug => "DEBUG",
786 Level::Info => "INFO",
787 Level::Warn => "WARN",
788 Level::Error => "ERROR",
789 };
790 let message = event.message.as_deref().unwrap_or("");
791 format!(
792 "{} {} {} {} {}",
793 event.timestamp,
794 level,
795 event.target.as_str(),
796 event.action.as_str(),
797 message
798 )
799 }
800
801 fn mark_failure<E>(&self, error: E) -> LogSinkError
802 where
803 E: std::error::Error + Send + Sync + 'static,
804 {
805 let message = error.to_string();
806 let diagnostic = diagnostic_for_sink_failure(message.clone());
807 let mut health = self.health.lock().expect("console sink health poisoned");
808 health.state = SinkHealthState::DegradedDropping;
809 health.last_error = Some(DiagnosticSummary::from(&diagnostic));
810 LogSinkError(Box::new(
811 ErrorContext::new(
812 error_codes::LOGGER_SINK_WRITE_FAILED,
813 "console sink write failed",
814 Remediation::not_recoverable(
815 "console sink write failure handling is owned by the logger runtime",
816 ),
817 )
818 .cause(message)
819 .source(Box::new(error)),
820 ))
821 }
822}
823
824impl LogSink for ConsoleSink {
825 fn write(&self, event: &LogEvent) -> Result<(), LogSinkError> {
826 let line = Self::format_line(event);
827 self.writer
828 .write_line(&line)
829 .map_err(|err| self.mark_failure(err))?;
830 let mut health = self.health.lock().expect("console sink health poisoned");
831 health.state = SinkHealthState::Healthy;
832 Ok(())
833 }
834
835 fn health(&self) -> SinkHealth {
836 self.health
837 .lock()
838 .expect("console sink health poisoned")
839 .clone()
840 }
841}
842
843#[cfg(feature = "fault-injection")]
844impl LogSink for FaultInjectingSink {
845 fn write(&self, event: &LogEvent) -> Result<(), LogSinkError> {
846 if let Some(state) = self.current_state() {
847 return Err(LogSinkError(Box::new(fault_injection_error_context(state))));
848 }
849 self.inner.write(event)
850 }
851
852 fn flush(&self) -> Result<(), LogSinkError> {
853 if let Some(state) = self.current_state() {
854 return Err(LogSinkError(Box::new(fault_injection_error_context(state))));
855 }
856 self.inner.flush()
857 }
858
859 fn health(&self) -> SinkHealth {
860 let mut health = self.inner.health();
861 if let Some(state) = self.current_state() {
862 let context = fault_injection_error_context(state);
863 health.state = state;
864 health.last_error = Some(DiagnosticSummary::from(context.diagnostic()));
865 }
866 health
867 }
868}
869
870mod sealed_emitters {
871 pub trait Sealed {}
872}
873
874#[expect(
875 dead_code,
876 reason = "crate-local emitter trait is intentionally available for logging-only injection"
877)]
878pub(crate) trait LogEmitter: sealed_emitters::Sealed + Send + Sync {
879 fn emit_log(&self, event: LogEvent) -> Result<(), EventError>;
880}
881
882impl sealed_emitters::Sealed for Logger {}
883
884impl LogEmitter for Logger {
885 fn emit_log(&self, event: LogEvent) -> Result<(), EventError> {
886 self.emit(event)
887 }
888}
889
890pub(crate) fn diagnostic_for_sink_failure(message: impl Into<String>) -> Diagnostic {
891 Diagnostic {
892 timestamp: Timestamp::now_utc(),
893 code: error_codes::LOGGER_SINK_WRITE_FAILED,
894 message: message.into(),
895 cause: None,
896 remediation: Remediation::not_recoverable(
897 "sink failure handling is owned by the logger runtime",
898 ),
899 docs: None,
900 details: serde_json::Map::new(),
901 }
902}
903
904fn validate_event(event: &LogEvent, expected_service: &ServiceName) -> Result<(), EventError> {
905 if event.version.as_str() != sc_observability_types::constants::OBSERVATION_ENVELOPE_VERSION {
906 return Err(EventError(Box::new(ErrorContext::new(
907 error_codes::LOGGER_INVALID_EVENT,
908 "log event version is invalid",
909 Remediation::recoverable(
910 "emit an observation v1 log event",
911 ["recreate the event with the current contract"],
912 ),
913 ))));
914 }
915
916 if &event.service != expected_service {
917 return Err(EventError(Box::new(ErrorContext::new(
918 error_codes::LOGGER_INVALID_EVENT,
919 "log event service does not match logger service",
920 Remediation::recoverable(
921 "emit the event with the logger service name",
922 ["rebuild the event before emitting"],
923 ),
924 ))));
925 }
926
927 Ok(())
928}
929
930pub(crate) fn default_log_file_name(service_name: &ServiceName) -> String {
931 format!(
932 "{}{}",
933 service_name.as_str(),
934 constants::DEFAULT_LOG_FILE_SUFFIX
935 )
936}
937
938pub(crate) fn default_log_path(log_root: &Path, service_name: &ServiceName) -> PathBuf {
939 log_root
940 .join(constants::DEFAULT_LOG_DIR_NAME)
941 .join(default_log_file_name(service_name))
942}
943
944pub(crate) fn rotated_log_path(active_path: &Path, index: u32) -> PathBuf {
945 let parent = active_path.parent().unwrap_or_else(|| Path::new("."));
946 let file_name = active_path
947 .file_name()
948 .and_then(|value| value.to_str())
949 .unwrap_or("active.log.jsonl");
950 parent.join(format!("{file_name}.{index}"))
951}
952
953fn aggregate_logging_health_state(sink_statuses: &[SinkHealth]) -> LoggingHealthState {
954 if sink_statuses
955 .iter()
956 .any(|sink| sink.state == SinkHealthState::Unavailable)
957 {
958 LoggingHealthState::Unavailable
959 } else if sink_statuses
960 .iter()
961 .any(|sink| sink.state != SinkHealthState::Healthy)
962 {
963 LoggingHealthState::DegradedDropping
964 } else {
965 LoggingHealthState::Healthy
966 }
967}
968
969#[cfg(feature = "fault-injection")]
970fn fault_injection_error_context(state: SinkHealthState) -> ErrorContext {
971 let forced_state = match state {
972 SinkHealthState::Healthy => "healthy",
973 SinkHealthState::DegradedDropping => "degraded_dropping",
974 SinkHealthState::Unavailable => "unavailable",
975 };
976 ErrorContext::new(
977 error_codes::LOGGER_SINK_FAULT_INJECTED,
978 format!("retained sink fault injection forced {forced_state} state"),
979 Remediation::recoverable(
980 "clear the retained sink fault injector before resuming normal validation traffic",
981 ["clear the injector state"],
982 ),
983 )
984 .detail("forced_state", Value::String(forced_state.to_string()))
985}
986
987fn rename_if_present(src: &Path, dest: &Path) -> std::io::Result<()> {
988 match fs::rename(src, dest) {
989 Ok(()) => Ok(()),
990 Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
991 Err(err) => Err(err),
992 }
993}
994
995fn redact_string_value(value: &mut Value) {
996 if let Value::String(text) = value {
997 *text = redact_bearer_token_text(text);
998 }
999}
1000
1001fn redact_bearer_token_text(input: &str) -> String {
1002 const PREFIX: &str = "Bearer ";
1003 let mut result = String::with_capacity(input.len());
1004 let mut remaining = input;
1005
1006 while let Some(index) = remaining.find(PREFIX) {
1007 result.push_str(&remaining[..index + PREFIX.len()]);
1008 let token_start = index + PREFIX.len();
1009 let token_end = remaining[token_start..]
1010 .find(char::is_whitespace)
1011 .map_or(remaining.len(), |value| token_start + value);
1012 result.push_str(constants::REDACTED_VALUE);
1013 remaining = &remaining[token_end..];
1014 }
1015
1016 result.push_str(remaining);
1017 result
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022 use super::*;
1023 use sc_observability_types::{
1024 ActionName, Diagnostic, ErrorCode, Level, LogEvent, LogOrder, LogQuery, LogSnapshot,
1025 ProcessIdentity, QueryError, QueryHealthState, TargetCategory, Timestamp,
1026 };
1027 use serde_json::{Map, json};
1028 use std::sync::Arc;
1029 use temp_env::{with_var, with_var_unset};
1030
1031 struct SharedBuffer {
1032 lines: Arc<Mutex<Vec<String>>>,
1033 }
1034
1035 impl ConsoleWriter for SharedBuffer {
1036 fn write_line(&self, line: &str) -> std::io::Result<()> {
1037 self.lines
1038 .lock()
1039 .expect("buffer poisoned")
1040 .push(line.to_string());
1041 Ok(())
1042 }
1043 }
1044
1045 struct PrefixRedactor;
1046
1047 impl Redactor for PrefixRedactor {
1048 fn redact(&self, key: &str, value: &mut Value) {
1049 if key == "secret" {
1050 *value = Value::String("custom-redacted".to_string());
1051 }
1052 }
1053 }
1054
1055 struct FailSink;
1056
1057 impl LogSink for FailSink {
1058 fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1059 Err(LogSinkError(Box::new(ErrorContext::new(
1060 error_codes::LOGGER_SINK_WRITE_FAILED,
1061 "fail sink write failed",
1062 Remediation::not_recoverable("test sink intentionally fails"),
1063 ))))
1064 }
1065
1066 fn health(&self) -> SinkHealth {
1067 SinkHealth {
1068 name: sink_name("fail"),
1069 state: SinkHealthState::DegradedDropping,
1070 last_error: None,
1071 }
1072 }
1073 }
1074
1075 #[derive(Default)]
1076 struct RecordingFlushSink {
1077 flush_calls: AtomicU64,
1078 }
1079
1080 impl LogSink for RecordingFlushSink {
1081 fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1082 Ok(())
1083 }
1084
1085 fn flush(&self) -> Result<(), LogSinkError> {
1086 self.flush_calls.fetch_add(1, Ordering::SeqCst);
1087 Ok(())
1088 }
1089
1090 fn health(&self) -> SinkHealth {
1091 SinkHealth {
1092 name: sink_name("recording-flush"),
1093 state: SinkHealthState::Healthy,
1094 last_error: None,
1095 }
1096 }
1097 }
1098
1099 fn service_name() -> ServiceName {
1100 ServiceName::new("sc-observability").expect("valid service name")
1101 }
1102
1103 fn schema_version() -> sc_observability_types::SchemaVersion {
1104 sc_observability_types::SchemaVersion::new(
1105 sc_observability_types::constants::OBSERVATION_ENVELOPE_VERSION,
1106 )
1107 .expect("valid schema version")
1108 }
1109
1110 fn outcome_label(value: &str) -> sc_observability_types::OutcomeLabel {
1111 sc_observability_types::OutcomeLabel::new(value).expect("valid outcome label")
1112 }
1113
1114 fn correlation_id(value: &str) -> sc_observability_types::CorrelationId {
1115 sc_observability_types::CorrelationId::new(value).expect("valid correlation id")
1116 }
1117
1118 fn sink_name(value: &str) -> SinkName {
1119 SinkName::new(value).expect("valid sink name")
1120 }
1121
1122 fn temp_path(name: &str) -> PathBuf {
1123 let path = std::env::temp_dir().join(format!(
1124 "sc-observability-{name}-{}-{}",
1125 std::process::id(),
1126 SystemTime::now()
1127 .duration_since(SystemTime::UNIX_EPOCH)
1128 .expect("system time before unix epoch")
1129 .as_nanos()
1130 ));
1131 let _ = fs::remove_dir_all(&path);
1132 path
1133 }
1134
1135 #[cfg(unix)]
1136 fn unix_file_identity(path: &Path) -> crate::query::FileIdentity {
1137 crate::query::file_identity_for_path(path)
1138 }
1139
1140 #[cfg(unix)]
1141 fn recreate_with_distinct_unix_identity(active_path: &Path) {
1142 let previous_identity = unix_file_identity(active_path);
1143 fs::remove_file(active_path).expect("remove active log");
1144
1145 let active_name = active_path
1146 .file_name()
1147 .and_then(|value| value.to_str())
1148 .expect("active log file name");
1149
1150 for attempt in 0..256 {
1151 let replacement =
1152 active_path.with_file_name(format!("{active_name}.replacement-{attempt}"));
1153 fs::File::create(&replacement).expect("create replacement file");
1154 if unix_file_identity(&replacement) != previous_identity {
1155 fs::rename(&replacement, active_path).expect("install replacement active log");
1156 return;
1157 }
1158 fs::remove_file(&replacement).expect("remove reused replacement inode");
1159 }
1160
1161 panic!("failed to create replacement active log with distinct Unix identity");
1162 }
1163
1164 #[cfg(not(unix))]
1165 fn recreate_with_distinct_unix_identity(active_path: &Path) {
1166 fs::remove_file(active_path).expect("remove active log");
1171 fs::File::create(active_path).expect("recreate active log");
1172 }
1173
1174 fn with_sc_log_root<T>(value: Option<&Path>, f: impl FnOnce() -> T) -> T {
1175 match value {
1176 Some(path) => with_var(constants::SC_LOG_ROOT_ENV_VAR, Some(path), f),
1177 None => with_var_unset(constants::SC_LOG_ROOT_ENV_VAR, f),
1178 }
1179 }
1180
1181 fn log_event(service_name: ServiceName) -> LogEvent {
1182 LogEvent {
1183 version: schema_version(),
1184 timestamp: Timestamp::UNIX_EPOCH,
1185 level: Level::Info,
1186 service: service_name,
1187 target: TargetCategory::new("logger.core").expect("valid target"),
1188 action: ActionName::new("emit").expect("valid action"),
1189 message: Some("Authorization: Bearer abc123".to_string()),
1190 identity: ProcessIdentity::default(),
1191 trace: None,
1192 request_id: None,
1193 correlation_id: None,
1194 outcome: Some(outcome_label("ok")),
1195 diagnostic: Some(Diagnostic {
1196 timestamp: Timestamp::UNIX_EPOCH,
1197 code: ErrorCode::new_static("SC_TEST"),
1198 message: "diagnostic".to_string(),
1199 cause: None,
1200 remediation: Remediation::recoverable("retry", ["inspect logs"]),
1201 docs: None,
1202 details: Map::default(),
1203 }),
1204 state_transition: None,
1205 fields: serde_json::Map::from_iter([
1206 ("token".to_string(), json!("Bearer secret")),
1207 ("secret".to_string(), json!("raw")),
1208 ]),
1209 }
1210 }
1211
1212 fn log_event_with_request(
1213 service_name: ServiceName,
1214 request_id: &str,
1215 message_padding: usize,
1216 ) -> LogEvent {
1217 let mut event = log_event(service_name);
1218 event.message = Some(format!("{request_id} {}", "x".repeat(message_padding)));
1219 event.request_id = Some(correlation_id(request_id));
1220 event
1221 .fields
1222 .insert("sequence".to_string(), json!(request_id.to_string()));
1223 event
1224 }
1225
1226 fn query_all(order: LogOrder) -> LogQuery {
1227 LogQuery {
1228 order,
1229 ..LogQuery::default()
1230 }
1231 }
1232
1233 fn request_ids(snapshot: &LogSnapshot) -> Vec<String> {
1234 snapshot
1235 .events
1236 .iter()
1237 .map(|event| event.request_id.clone().expect("request_id").to_string())
1238 .collect()
1239 }
1240
1241 fn drain_follow_until_request_id(
1242 follow: &mut LogFollowSession,
1243 expected_request_id: &str,
1244 ) -> Vec<String> {
1245 let mut drained = Vec::new();
1246 for _ in 0..10 {
1247 let snapshot = follow.poll().expect("follow poll");
1248 drained.extend(request_ids(&snapshot));
1249 if drained
1250 .iter()
1251 .any(|request_id| request_id == expected_request_id)
1252 {
1253 return drained;
1254 }
1255 }
1256
1257 panic!("follow session never yielded {expected_request_id}");
1258 }
1259
1260 #[test]
1261 fn logger_config_default_for_sets_documented_defaults() {
1262 let root = temp_path("defaults");
1263 let config = LoggerConfig::default_for(service_name(), root.clone());
1264 assert_eq!(config.level, LevelFilter::Info);
1265 assert_eq!(config.queue_capacity, constants::DEFAULT_LOG_QUEUE_CAPACITY);
1266 assert_eq!(
1267 config.rotation.max_bytes,
1268 constants::DEFAULT_ROTATION_MAX_BYTES
1269 );
1270 assert_eq!(
1271 config.rotation.max_files,
1272 constants::DEFAULT_ROTATION_MAX_FILES
1273 );
1274 assert_eq!(
1275 config.retention.max_age_days,
1276 constants::DEFAULT_RETENTION_MAX_AGE_DAYS
1277 );
1278 assert!(config.enable_file_sink);
1279 assert!(!config.enable_console_sink);
1280 assert_eq!(
1281 default_log_path(&root, &config.service_name),
1282 root.join(constants::DEFAULT_LOG_DIR_NAME)
1283 .join(default_log_file_name(&config.service_name))
1284 );
1285 }
1286
1287 #[test]
1288 fn logger_config_default_for_uses_sc_log_root_when_log_root_is_empty() {
1289 let env_root = temp_path("env-root");
1290
1291 with_sc_log_root(Some(&env_root), || {
1292 let config = LoggerConfig::default_for(service_name(), PathBuf::new());
1293 assert_eq!(config.log_root, env_root);
1294 });
1295 }
1296
1297 #[test]
1298 fn logger_config_default_for_prefers_explicit_log_root_over_env() {
1299 let env_root = temp_path("env-root-override");
1300 let explicit_root = temp_path("explicit-root");
1301
1302 with_sc_log_root(Some(&env_root), || {
1303 let config = LoggerConfig::default_for(service_name(), explicit_root.clone());
1304 assert_eq!(config.log_root, explicit_root);
1305 });
1306 }
1307
1308 #[test]
1309 fn logger_config_debug_renders_redaction_summary() {
1310 let root = temp_path("debug");
1311 let config = LoggerConfig::default_for(service_name(), root);
1312
1313 let rendered = format!("{config:?}");
1314
1315 assert!(rendered.contains("LoggerConfig"));
1316 assert!(rendered.contains("RedactionPolicy"));
1317 assert!(rendered.contains("custom_redactors: 0"));
1318 }
1319
1320 #[test]
1321 fn file_only_logging_writes_jsonl_to_default_path() {
1322 let root = temp_path("file-only");
1323 let config = LoggerConfig::default_for(service_name(), root.clone());
1324 let logger = Logger::new(config).expect("logger");
1325 logger.emit(log_event(service_name())).expect("emit");
1326
1327 let path = default_log_path(&root, &service_name());
1328 let contents = fs::read_to_string(&path).expect("read log file");
1329 assert!(contents.contains("\"level\":\"Info\""));
1330 assert!(contents.contains("[REDACTED]"));
1331 }
1332
1333 #[test]
1334 fn file_and_console_fan_out_both_receive_event() {
1335 let root = temp_path("fanout");
1336 let mut config = LoggerConfig::default_for(service_name(), root.clone());
1337 config.enable_console_sink = false;
1338 let mut builder = Logger::builder(config).expect("logger builder");
1339
1340 let lines = Arc::new(Mutex::new(Vec::<String>::new()));
1341 builder.register_sink(SinkRegistration::new(Arc::new(ConsoleSink::from_writer(
1342 Box::new(SharedBuffer {
1343 lines: lines.clone(),
1344 }),
1345 ))));
1346 let logger = builder.build();
1347
1348 logger.emit(log_event(service_name())).expect("emit");
1349
1350 let path = default_log_path(&root, &service_name());
1351 assert!(path.exists());
1352 let lines = lines.lock().expect("lines poisoned");
1353 assert_eq!(lines.len(), 1);
1354 assert!(lines[0].contains("logger.core"));
1355 }
1356
1357 #[test]
1358 fn redaction_runs_before_sink_fan_out() {
1359 let root = temp_path("redaction");
1360 let mut config = LoggerConfig::default_for(service_name(), root.clone());
1361 config.enable_console_sink = false;
1362 config.redaction.denylist_keys.push("token".to_string());
1363 config
1364 .redaction
1365 .custom_redactors
1366 .push(Box::new(PrefixRedactor));
1367 let mut builder = Logger::builder(config).expect("logger builder");
1368
1369 let lines = Arc::new(Mutex::new(Vec::<String>::new()));
1370 builder.register_sink(SinkRegistration::new(Arc::new(ConsoleSink::from_writer(
1371 Box::new(SharedBuffer {
1372 lines: lines.clone(),
1373 }),
1374 ))));
1375 let logger = builder.build();
1376
1377 logger.emit(log_event(service_name())).expect("emit");
1378
1379 let file_path = default_log_path(&root, &service_name());
1380 let file_contents = fs::read_to_string(file_path).expect("read file");
1381 let console_line = lines.lock().expect("lines poisoned")[0].clone();
1382 assert!(file_contents.contains("[REDACTED]"));
1383 assert!(file_contents.contains("custom-redacted"));
1384 assert!(console_line.contains("[REDACTED]"));
1385 }
1386
1387 #[test]
1388 fn invalid_event_returns_event_error() {
1389 let root = temp_path("invalid");
1390 let config = LoggerConfig::default_for(service_name(), root);
1391 let logger = Logger::new(config).expect("logger");
1392 let mut event = log_event(service_name());
1393 event.version = sc_observability_types::SchemaVersion::new("v0").expect("valid version");
1394 assert!(logger.emit(event).is_err());
1395 }
1396
1397 #[test]
1398 fn sink_failures_are_fail_open_and_counted_in_health() {
1399 let root = temp_path("fail-open");
1400 let mut config = LoggerConfig::default_for(service_name(), root);
1401 config.enable_file_sink = false;
1402 let mut builder = Logger::builder(config).expect("logger builder");
1403 builder.register_sink(SinkRegistration::new(Arc::new(FailSink)));
1404 let logger = builder.build();
1405
1406 logger
1407 .emit(log_event(service_name()))
1408 .expect("emit still succeeds");
1409
1410 let health = logger.health();
1411 assert_eq!(health.state, LoggingHealthState::DegradedDropping);
1412 assert_eq!(health.dropped_events_total, 1);
1413 assert!(health.last_error.is_some());
1414 }
1415
1416 #[test]
1417 fn flush_failures_are_fail_open_and_counted_in_health() {
1418 struct FlushFailSink;
1419
1420 impl LogSink for FlushFailSink {
1421 fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1422 Ok(())
1423 }
1424
1425 fn flush(&self) -> Result<(), LogSinkError> {
1426 Err(LogSinkError(Box::new(ErrorContext::new(
1427 error_codes::LOGGER_FLUSH_FAILED,
1428 "flush failed",
1429 Remediation::not_recoverable("test sink intentionally fails flush"),
1430 ))))
1431 }
1432
1433 fn health(&self) -> SinkHealth {
1434 SinkHealth {
1435 name: sink_name("flush-fail"),
1436 state: SinkHealthState::DegradedDropping,
1437 last_error: None,
1438 }
1439 }
1440 }
1441
1442 let root = temp_path("flush-fail");
1443 let mut config = LoggerConfig::default_for(service_name(), root);
1444 config.enable_file_sink = false;
1445 let mut builder = Logger::builder(config).expect("logger builder");
1446 builder.register_sink(SinkRegistration::new(Arc::new(FlushFailSink)));
1447 let logger = builder.build();
1448
1449 logger.flush().expect("flush remains fail-open");
1450
1451 let health = logger.health();
1452 assert_eq!(health.dropped_events_total, 0);
1453 assert_eq!(health.flush_errors_total, 1);
1454 assert!(health.last_error.is_some());
1455 }
1456
1457 #[test]
1458 fn default_log_path_uses_service_scoped_layout() {
1459 let service = ServiceName::new("custom-service").expect("valid service");
1460 let log_root = PathBuf::from("observability-root");
1461
1462 let path = default_log_path(&log_root, &service);
1463
1464 assert_eq!(
1465 path,
1466 PathBuf::from("observability-root/logs/custom-service.log.jsonl")
1467 );
1468 }
1469
1470 #[test]
1471 fn rotated_log_paths_keep_the_active_filename_prefix() {
1472 let sink = JsonlFileSink::new(
1473 PathBuf::from("observability-root/logs/custom-service.log.jsonl"),
1474 RotationPolicy::default(),
1475 RetentionPolicy::default(),
1476 );
1477
1478 assert_eq!(
1479 sink.rotated_path(1),
1480 PathBuf::from("observability-root/logs/custom-service.log.jsonl.1")
1481 );
1482 assert_eq!(
1483 sink.rotated_path(2),
1484 PathBuf::from("observability-root/logs/custom-service.log.jsonl.2")
1485 );
1486 }
1487
1488 #[test]
1489 fn console_sink_stderr_constructor_is_operational() {
1490 let sink = ConsoleSink::stderr();
1491
1492 sink.write(&log_event(service_name()))
1493 .expect("stderr write");
1494
1495 assert_eq!(sink.health().state, SinkHealthState::Healthy);
1496 }
1497
1498 #[cfg(feature = "fault-injection")]
1499 #[test]
1500 fn retained_sink_fault_injector_forces_degraded_logging_health() {
1501 let root = temp_path("fault-degraded");
1502 let mut config = LoggerConfig::default_for(service_name(), root);
1503 config.enable_file_sink = false;
1504 let mut builder = Logger::builder(config).expect("logger builder");
1505 let injector = RetainedSinkFaultInjector::new();
1506 builder.register_sink(SinkRegistration::new(
1507 injector.wrap(Arc::new(RecordingFlushSink::default())),
1508 ));
1509 let logger = builder.build();
1510
1511 injector.force_degraded();
1512 logger
1513 .emit(log_event(service_name()))
1514 .expect("emit remains fail-open");
1515
1516 let health = logger.health();
1517 assert_eq!(health.state, LoggingHealthState::DegradedDropping);
1518 assert_eq!(
1519 health.sink_statuses[0].state,
1520 SinkHealthState::DegradedDropping
1521 );
1522 assert_eq!(health.dropped_events_total, 1);
1523 assert!(health.last_error.is_some());
1524 }
1525
1526 #[cfg(feature = "fault-injection")]
1527 #[test]
1528 fn retained_sink_fault_injector_forces_unavailable_logging_health() {
1529 let root = temp_path("fault-unavailable");
1530 let mut config = LoggerConfig::default_for(service_name(), root);
1531 config.enable_file_sink = false;
1532 let mut builder = Logger::builder(config).expect("logger builder");
1533 let injector = RetainedSinkFaultInjector::new();
1534 builder.register_sink(SinkRegistration::new(
1535 injector.wrap(Arc::new(RecordingFlushSink::default())),
1536 ));
1537 let logger = builder.build();
1538
1539 injector.force_unavailable();
1540 logger
1541 .emit(log_event(service_name()))
1542 .expect("emit remains fail-open");
1543
1544 let health = logger.health();
1545 assert_eq!(health.state, LoggingHealthState::Unavailable);
1546 assert_eq!(health.sink_statuses[0].state, SinkHealthState::Unavailable);
1547 assert_eq!(health.dropped_events_total, 1);
1548 assert!(health.last_error.is_some());
1549 }
1550
1551 #[test]
1552 fn sink_filter_blocks_event_delivery() {
1553 struct DenyAll;
1554
1555 impl LogFilter for DenyAll {
1556 fn accepts(&self, _event: &LogEvent) -> bool {
1557 false
1558 }
1559 }
1560
1561 let root = temp_path("filter");
1562 let mut config = LoggerConfig::default_for(service_name(), root);
1563 config.enable_file_sink = false;
1564 let mut builder = Logger::builder(config).expect("logger builder");
1565
1566 let lines = Arc::new(Mutex::new(Vec::<String>::new()));
1567 builder.register_sink(
1568 SinkRegistration::new(Arc::new(ConsoleSink::from_writer(Box::new(SharedBuffer {
1569 lines: lines.clone(),
1570 }))))
1571 .with_filter(Arc::new(DenyAll)),
1572 );
1573 let logger = builder.build();
1574
1575 logger.emit(log_event(service_name())).expect("emit");
1576
1577 assert!(lines.lock().expect("lines poisoned").is_empty());
1578 }
1579
1580 #[test]
1581 fn shutdown_blocks_future_emits() {
1582 let root = temp_path("shutdown");
1583 let config = LoggerConfig::default_for(service_name(), root);
1584 let logger = Logger::new(config).expect("logger");
1585 logger.shutdown().expect("shutdown");
1586 assert!(logger.emit(log_event(service_name())).is_err());
1587 assert!(logger.flush().is_ok());
1588 assert!(logger.shutdown().is_ok());
1589 }
1590
1591 #[test]
1592 fn shutdown_flushes_registered_sinks_before_marking_shutdown() {
1593 let root = temp_path("shutdown-flush");
1594 let mut config = LoggerConfig::default_for(service_name(), root);
1595 config.enable_file_sink = false;
1596 let mut builder = Logger::builder(config).expect("logger builder");
1597 let sink = Arc::new(RecordingFlushSink::default());
1598 builder.register_sink(SinkRegistration::new(sink.clone()));
1599 let logger = builder.build();
1600
1601 logger.shutdown().expect("shutdown");
1602
1603 assert_eq!(sink.flush_calls.load(Ordering::SeqCst), 1);
1604 }
1605
1606 #[test]
1607 fn historical_query_reads_active_and_rotated_files() {
1608 let root = temp_path("query-rotated");
1609 let mut config = LoggerConfig::default_for(service_name(), root.clone());
1610 config.rotation.max_bytes = 350;
1611 config.rotation.max_files = 4;
1612 let logger = Logger::new(config).expect("logger");
1613
1614 for request_id in ["req-1", "req-2", "req-3"] {
1615 logger
1616 .emit(log_event_with_request(service_name(), request_id, 240))
1617 .expect("emit");
1618 }
1619
1620 let active_path = default_log_path(&root, &service_name());
1621 let resolved_paths = crate::query::query_active_and_rotated_paths(&active_path, 4);
1622 assert!(
1623 resolved_paths
1624 .iter()
1625 .any(|path| path.ends_with("sc-observability.log.jsonl.1"))
1626 );
1627 assert!(
1628 resolved_paths
1629 .iter()
1630 .any(|path| path.ends_with("sc-observability.log.jsonl"))
1631 );
1632
1633 let asc = logger
1634 .query(&query_all(LogOrder::OldestFirst))
1635 .expect("asc query");
1636 assert_eq!(request_ids(&asc), ["req-1", "req-2", "req-3"]);
1637
1638 let desc = logger
1639 .query(&LogQuery {
1640 order: LogOrder::NewestFirst,
1641 limit: Some(2),
1642 ..LogQuery::default()
1643 })
1644 .expect("desc query");
1645 assert_eq!(request_ids(&desc), ["req-3", "req-2"]);
1646 assert!(desc.truncated);
1647 }
1648
1649 #[test]
1650 fn historical_query_preserves_order_across_multiple_rotated_files() {
1651 let root = temp_path("query-multi-rotation-order");
1652 let mut config = LoggerConfig::default_for(service_name(), root.clone());
1653 config.rotation.max_bytes = 350;
1654 config.rotation.max_files = 6;
1655 let logger = Logger::new(config).expect("logger");
1656
1657 for request_id in ["req-1", "req-2", "req-3", "req-4", "req-5"] {
1658 logger
1659 .emit(log_event_with_request(service_name(), request_id, 220))
1660 .expect("emit");
1661 }
1662
1663 let oldest_first = logger
1664 .query(&query_all(LogOrder::OldestFirst))
1665 .expect("oldest-first query");
1666 assert_eq!(
1667 request_ids(&oldest_first),
1668 ["req-1", "req-2", "req-3", "req-4", "req-5"]
1669 );
1670
1671 let newest_first = logger
1672 .query(&LogQuery {
1673 order: LogOrder::NewestFirst,
1674 ..LogQuery::default()
1675 })
1676 .expect("newest-first query");
1677 assert_eq!(
1678 request_ids(&newest_first),
1679 ["req-5", "req-4", "req-3", "req-2", "req-1"]
1680 );
1681 }
1682
1683 #[test]
1684 fn logger_and_jsonl_reader_query_have_parity() {
1685 let root = temp_path("query-parity");
1686 let mut config = LoggerConfig::default_for(service_name(), root.clone());
1687 config.rotation.max_bytes = 350;
1688 config.rotation.max_files = 4;
1689 let logger = Logger::new(config).expect("logger");
1690
1691 for request_id in ["req-a", "req-b", "req-c"] {
1692 logger
1693 .emit(log_event_with_request(service_name(), request_id, 220))
1694 .expect("emit");
1695 }
1696
1697 let query = LogQuery {
1698 order: LogOrder::NewestFirst,
1699 limit: Some(2),
1700 ..LogQuery::default()
1701 };
1702 let logger_snapshot = logger.query(&query).expect("logger query");
1703 let reader = JsonlLogReader::new(default_log_path(&root, &service_name()));
1704 let reader_snapshot = reader.query(&query).expect("reader query");
1705
1706 assert_eq!(reader_snapshot, logger_snapshot);
1707 }
1708
1709 #[test]
1710 fn follow_starts_at_tail_and_survives_multiple_rotations() {
1711 let root = temp_path("follow-rotation");
1712 let mut config = LoggerConfig::default_for(service_name(), root);
1713 config.rotation.max_bytes = 350;
1714 config.rotation.max_files = 6;
1715 let logger = Logger::new(config).expect("logger");
1716
1717 logger
1718 .emit(log_event_with_request(service_name(), "backlog", 220))
1719 .expect("emit backlog");
1720
1721 let mut follow = logger
1722 .follow(query_all(LogOrder::OldestFirst))
1723 .expect("follow");
1724 assert!(follow.poll().expect("initial poll").events.is_empty());
1725
1726 for request_id in ["fresh-1", "fresh-2", "fresh-3"] {
1727 logger
1728 .emit(log_event_with_request(service_name(), request_id, 220))
1729 .expect("emit fresh");
1730 }
1731
1732 let snapshot = follow.poll().expect("follow poll");
1733 assert_eq!(request_ids(&snapshot), ["fresh-1", "fresh-2", "fresh-3"]);
1734 assert_eq!(follow.health().state, QueryHealthState::Healthy);
1735 }
1736
1737 #[test]
1738 fn logger_and_jsonl_reader_follow_have_parity() {
1739 let root = temp_path("follow-parity");
1740 let mut config = LoggerConfig::default_for(service_name(), root.clone());
1741 config.rotation.max_bytes = 350;
1742 config.rotation.max_files = 6;
1743 let logger = Logger::new(config).expect("logger");
1744
1745 logger
1746 .emit(log_event_with_request(service_name(), "backlog", 220))
1747 .expect("emit backlog");
1748
1749 let query = query_all(LogOrder::OldestFirst);
1750 let mut logger_follow = logger.follow(query.clone()).expect("logger follow");
1751 let reader = JsonlLogReader::new(default_log_path(&root, &service_name()));
1752 let mut reader_follow = reader.follow(query).expect("reader follow");
1753
1754 for request_id in ["reader-1", "reader-2"] {
1755 logger
1756 .emit(log_event_with_request(service_name(), request_id, 220))
1757 .expect("emit fresh");
1758 }
1759
1760 assert_eq!(
1761 logger_follow.poll().expect("logger follow poll"),
1762 reader_follow.poll().expect("reader follow poll")
1763 );
1764 }
1765
1766 #[test]
1767 fn query_health_tracks_decode_and_shutdown_failures() {
1768 use std::io::Write as _;
1769
1770 let root = temp_path("query-health");
1771 let config = LoggerConfig::default_for(service_name(), root.clone());
1772 let logger = Logger::new(config).expect("logger");
1773
1774 logger
1775 .emit(log_event_with_request(service_name(), "healthy", 20))
1776 .expect("emit");
1777
1778 let active_path = default_log_path(&root, &service_name());
1779 let mut file = OpenOptions::new()
1780 .append(true)
1781 .open(&active_path)
1782 .expect("open active log");
1783 writeln!(file, "{{not-json").expect("append malformed json");
1784
1785 let decode_error = logger
1786 .query(&query_all(LogOrder::OldestFirst))
1787 .expect_err("decode error");
1788 assert!(matches!(decode_error, QueryError::Decode(_)));
1789 let degraded_health = logger.health().query.expect("query health");
1790 assert_eq!(degraded_health.state, QueryHealthState::Degraded);
1791 assert!(degraded_health.last_error.is_some());
1792
1793 logger.shutdown().expect("shutdown");
1794 let shutdown_error = logger
1795 .query(&query_all(LogOrder::OldestFirst))
1796 .expect_err("shutdown error");
1797 assert!(matches!(shutdown_error, QueryError::Shutdown));
1798 assert_eq!(
1799 logger.health().query.expect("query health").state,
1800 QueryHealthState::Unavailable
1801 );
1802 assert!(matches!(
1803 logger.follow(query_all(LogOrder::OldestFirst)),
1804 Err(QueryError::Shutdown)
1805 ));
1806 }
1807
1808 #[test]
1809 fn logger_query_returns_shutdown_variant_after_shutdown() {
1810 let root = temp_path("query-shutdown-variant");
1811 let config = LoggerConfig::default_for(service_name(), root);
1812 let logger = Logger::new(config).expect("logger");
1813
1814 logger.shutdown().expect("shutdown");
1815
1816 let error = logger
1817 .query(&query_all(LogOrder::OldestFirst))
1818 .expect_err("shutdown error");
1819 assert!(matches!(error, QueryError::Shutdown));
1820 }
1821
1822 #[test]
1823 fn logger_follow_session_becomes_unavailable_after_shutdown() {
1824 let root = temp_path("follow-shutdown");
1825 let config = LoggerConfig::default_for(service_name(), root);
1826 let logger = Logger::new(config).expect("logger");
1827
1828 let mut follow = logger
1829 .follow(query_all(LogOrder::OldestFirst))
1830 .expect("follow");
1831 assert!(follow.poll().expect("initial poll").events.is_empty());
1832
1833 logger.shutdown().expect("shutdown");
1834
1835 assert!(matches!(follow.poll(), Err(QueryError::Shutdown)));
1836 assert_eq!(follow.health().state, QueryHealthState::Unavailable);
1837 }
1838
1839 #[test]
1840 fn logger_query_and_follow_reject_invalid_queries() {
1841 let root = temp_path("invalid-query");
1842 let config = LoggerConfig::default_for(service_name(), root);
1843 let logger = Logger::new(config).expect("logger");
1844
1845 let invalid_limit = LogQuery {
1846 limit: Some(0),
1847 ..LogQuery::default()
1848 };
1849 let invalid_range = LogQuery {
1850 since: Some(Timestamp::now_utc()),
1851 until: Some(Timestamp::UNIX_EPOCH),
1852 ..LogQuery::default()
1853 };
1854
1855 assert!(matches!(
1856 logger.query(&invalid_limit),
1857 Err(QueryError::InvalidQuery(_))
1858 ));
1859 assert!(matches!(
1860 logger.follow(invalid_limit),
1861 Err(QueryError::InvalidQuery(_))
1862 ));
1863 assert!(matches!(
1864 logger.query(&invalid_range),
1865 Err(QueryError::InvalidQuery(_))
1866 ));
1867 assert!(matches!(
1868 logger.follow(invalid_range),
1869 Err(QueryError::InvalidQuery(_))
1870 ));
1871 }
1872
1873 #[test]
1874 fn query_and_follow_are_unavailable_without_file_sink() {
1875 let root = temp_path("query-unavailable");
1876 let mut config = LoggerConfig::default_for(service_name(), root);
1877 config.enable_file_sink = false;
1878 let logger = Logger::new(config).expect("logger");
1879
1880 assert!(matches!(
1881 logger.query(&query_all(LogOrder::OldestFirst)),
1882 Err(QueryError::Unavailable(_))
1883 ));
1884 assert!(matches!(
1885 logger.follow(query_all(LogOrder::OldestFirst)),
1886 Err(QueryError::Unavailable(_))
1887 ));
1888 assert_eq!(
1889 logger.health().query.expect("query health").state,
1890 QueryHealthState::Unavailable
1891 );
1892 }
1893
1894 #[cfg_attr(windows, ignore)]
1898 #[test]
1899 fn follow_recovers_after_active_file_truncate_and_recreate() {
1900 let root = temp_path("follow-truncate-recreate");
1901 let config = LoggerConfig::default_for(service_name(), root.clone());
1902 let logger = Logger::new(config).expect("logger");
1903
1904 logger
1905 .emit(log_event_with_request(service_name(), "backlog", 20))
1906 .expect("emit backlog");
1907
1908 let mut follow = logger
1909 .follow(query_all(LogOrder::OldestFirst))
1910 .expect("follow");
1911 assert!(follow.poll().expect("initial poll").events.is_empty());
1912
1913 logger
1914 .emit(log_event_with_request(
1915 service_name(),
1916 "before-truncate",
1917 20,
1918 ))
1919 .expect("emit before truncate");
1920 assert_eq!(
1921 request_ids(&follow.poll().expect("poll before truncate")),
1922 ["before-truncate"]
1923 );
1924
1925 let active_path = default_log_path(&root, &service_name());
1926 fs::File::create(&active_path).expect("truncate active log");
1927
1928 logger
1929 .emit(log_event_with_request(service_name(), "after-truncate", 20))
1930 .expect("emit after truncate");
1931 let after_truncate = drain_follow_until_request_id(&mut follow, "after-truncate");
1934 assert!(
1935 after_truncate == vec!["after-truncate"]
1936 || after_truncate == vec!["backlog", "before-truncate", "after-truncate"]
1937 );
1938 let truncate_health = follow.health();
1939 assert_eq!(truncate_health.state, QueryHealthState::Degraded);
1940 assert!(
1941 truncate_health
1942 .last_error
1943 .expect("truncate health summary")
1944 .message
1945 .contains("truncation")
1946 );
1947
1948 recreate_with_distinct_unix_identity(&active_path);
1949 logger
1950 .emit(log_event_with_request(service_name(), "after-recreate", 20))
1951 .expect("emit after recreate");
1952 let after_recreate = drain_follow_until_request_id(&mut follow, "after-recreate");
1953 assert_eq!(after_recreate, vec!["after-recreate"]);
1954 let recreate_health = follow.health();
1955 assert_eq!(recreate_health.state, QueryHealthState::Degraded);
1956 assert!(
1957 recreate_health
1958 .last_error
1959 .expect("recreate health summary")
1960 .message
1961 .contains("identity changed")
1962 );
1963 }
1964}