Skip to main content

sc_observability/
lib.rs

1//! Lightweight structured logging for the `sc-observability` workspace.
2//!
3//! This crate owns logging-only concerns: logger configuration, built-in file
4//! and console sinks, sink fan-out, filtering, redaction, and logging health.
5//! It intentionally avoids typed observation routing and OTLP transport logic.
6#![expect(
7    clippy::missing_errors_doc,
8    reason = "public facade methods already have behavior documented centrally in the workspace docs, and repeating per-method Errors sections here would add low-signal boilerplate"
9)]
10#![expect(
11    clippy::must_use_candidate,
12    reason = "the facade intentionally avoids pervasive must_use boilerplate on constructors and lightweight accessors where the return types are already obvious from call sites"
13)]
14#![expect(
15    clippy::return_self_not_must_use,
16    reason = "builder-style chaining methods in this facade predate pedantic lint adoption and remain intentionally lightweight"
17)]
18
19pub mod constants;
20pub mod error_codes;
21
22mod builder;
23mod follow;
24mod health;
25mod jsonl_reader;
26mod query;
27
28use std::fs::{self, OpenOptions};
29use std::io::Write;
30use std::path::{Path, PathBuf};
31use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
32use std::sync::{Arc, Mutex};
33use std::time::{Duration, SystemTime};
34
35use crate::health::QueryHealthTracker;
36#[doc(inline)]
37pub use builder::LoggerBuilder;
38#[doc(inline)]
39pub use follow::LogFollowSession;
40#[doc(inline)]
41pub use jsonl_reader::JsonlLogReader;
42#[doc(inline)]
43pub use sc_observability_types::{
44    ActionName, ErrorCode, EventError, Level, LogEvent, LogQuery, LogSnapshot, LoggingHealthReport,
45    LoggingHealthState, OBSERVATION_ENVELOPE_VERSION, OutcomeLabel, ProcessIdentity, SchemaVersion,
46    ServiceName, SinkHealth, SinkHealthState, TargetCategory, Timestamp,
47};
48use sc_observability_types::{
49    Diagnostic, DiagnosticInfo, DiagnosticSummary, ErrorContext, FlushError, InitError,
50    LevelFilter, LogSinkError, ProcessIdentityPolicy, QueryError, QueryHealthState, Remediation,
51    ShutdownError, SinkName,
52};
53use serde_json::Value;
54
55/// Rotation limits for the built-in JSONL file sink.
56#[derive(Debug, Clone, Copy, PartialEq, Eq)]
57pub struct RotationPolicy {
58    /// Maximum size of the active JSONL file before rotation.
59    pub max_bytes: u64,
60    /// Maximum number of rotated files to retain.
61    pub max_files: u32,
62}
63
64impl Default for RotationPolicy {
65    fn default() -> Self {
66        Self {
67            max_bytes: constants::DEFAULT_ROTATION_MAX_BYTES,
68            max_files: constants::DEFAULT_ROTATION_MAX_FILES,
69        }
70    }
71}
72
73/// Retention limits for rotated JSONL files owned by the built-in file sink.
74#[derive(Debug, Clone, Copy, PartialEq, Eq)]
75pub struct RetentionPolicy {
76    /// Maximum age in days for rotated JSONL files.
77    pub max_age_days: u32,
78}
79
80impl Default for RetentionPolicy {
81    fn default() -> Self {
82        Self {
83            max_age_days: constants::DEFAULT_RETENTION_MAX_AGE_DAYS,
84        }
85    }
86}
87
88/// Redacts one key/value pair before an event reaches registered sinks.
89pub trait Redactor: Send + Sync {
90    /// Redacts one event field in place.
91    fn redact(&self, key: &str, value: &mut Value);
92}
93
94/// Redaction settings applied to log events before sink fan-out.
95#[derive(Default)]
96pub struct RedactionPolicy {
97    /// Exact field names that must always be redacted.
98    pub denylist_keys: Vec<String>,
99    /// Whether bearer-token shaped values should be redacted.
100    pub redact_bearer_tokens: bool,
101    /// Additional caller-supplied redactors.
102    pub custom_redactors: Vec<Box<dyn Redactor>>,
103}
104
105impl std::fmt::Debug for RedactionPolicy {
106    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
107        f.debug_struct("RedactionPolicy")
108            .field("denylist_keys", &self.denylist_keys)
109            .field("redact_bearer_tokens", &self.redact_bearer_tokens)
110            .field("custom_redactors", &self.custom_redactors.len())
111            .finish()
112    }
113}
114
115/// Filters events before they are written to one registered sink.
116pub trait LogFilter: Send + Sync {
117    /// Returns whether the sink should receive the event.
118    fn accepts(&self, event: &LogEvent) -> bool;
119}
120
121/// One concrete event sink used by the logger runtime.
122pub trait LogSink: Send + Sync {
123    /// Writes one event to the sink.
124    fn write(&self, event: &LogEvent) -> Result<(), LogSinkError>;
125
126    /// Flushes any buffered sink state.
127    fn flush(&self) -> Result<(), LogSinkError> {
128        Ok(())
129    }
130
131    /// Returns the current sink health snapshot.
132    fn health(&self) -> SinkHealth;
133}
134
135/// Construction-time sink registration pairing one sink with an optional filter.
136#[derive(Clone)]
137#[expect(
138    missing_debug_implementations,
139    reason = "registration stores trait-object sinks and filters, so derived Debug would not provide a meaningful stable contract"
140)]
141pub struct SinkRegistration {
142    /// Concrete sink implementation.
143    pub(crate) sink: Arc<dyn LogSink>,
144    /// Optional sink-local filter.
145    pub(crate) filter: Option<Arc<dyn LogFilter>>,
146}
147
148impl SinkRegistration {
149    /// Wraps a sink for logger registration.
150    pub fn new(sink: Arc<dyn LogSink>) -> Self {
151        Self { sink, filter: None }
152    }
153
154    /// Adds a sink-local filter to the registration.
155    pub fn with_filter(mut self, filter: Arc<dyn LogFilter>) -> Self {
156        self.filter = Some(filter);
157        self
158    }
159}
160
161#[cfg(feature = "fault-injection")]
162/// Public controller for forcing retained sink health into validation states.
163///
164/// This type is intended for live validation and test harness use only. It
165/// wraps one existing retained sink and forces that sink to report degraded or
166/// unavailable health through the ordinary `LoggingHealthReport` path without
167/// sabotaging the filesystem or reaching into crate-private internals.
168#[derive(Clone, Default)]
169#[expect(
170    missing_debug_implementations,
171    reason = "fault injector state is a small validation-only mutex wrapper without a useful stable Debug contract"
172)]
173pub struct RetainedSinkFaultInjector {
174    forced_state: Arc<Mutex<Option<SinkHealthState>>>,
175}
176
177#[cfg(feature = "fault-injection")]
178impl RetainedSinkFaultInjector {
179    /// Creates a new injector with no forced sink fault.
180    pub fn new() -> Self {
181        Self::default()
182    }
183
184    /// Wraps one retained sink so its health can be forced during validation.
185    ///
186    /// `Arc<dyn LogSink>` is intentionally preserved in this public signature
187    /// because `SinkRegistration::new()` takes `Arc<dyn LogSink>` and this
188    /// helper exists solely to compose with that registration surface.
189    pub fn wrap(&self, sink: Arc<dyn LogSink>) -> Arc<dyn LogSink> {
190        Arc::new(FaultInjectingSink {
191            inner: sink,
192            forced_state: self.forced_state.clone(),
193        })
194    }
195
196    /// Forces the wrapped retained sink into the degraded-dropping state.
197    pub fn force_degraded(&self) {
198        self.set_state(SinkHealthState::DegradedDropping);
199    }
200
201    /// Forces the wrapped retained sink into the unavailable state.
202    pub fn force_unavailable(&self) {
203        self.set_state(SinkHealthState::Unavailable);
204    }
205
206    /// Clears any forced sink fault and returns the wrapped sink to normal
207    /// health reporting.
208    ///
209    /// # Panics
210    ///
211    /// Panics if the retained-sink fault-state mutex has been poisoned.
212    pub fn clear(&self) {
213        *self
214            .forced_state
215            .lock()
216            .expect("retained sink fault state poisoned") = None;
217    }
218
219    fn set_state(&self, state: SinkHealthState) {
220        *self
221            .forced_state
222            .lock()
223            .expect("retained sink fault state poisoned") = Some(state);
224    }
225}
226
227#[cfg(feature = "fault-injection")]
228struct FaultInjectingSink {
229    inner: Arc<dyn LogSink>,
230    forced_state: Arc<Mutex<Option<SinkHealthState>>>,
231}
232
233#[cfg(feature = "fault-injection")]
234impl FaultInjectingSink {
235    fn current_state(&self) -> Option<SinkHealthState> {
236        *self
237            .forced_state
238            .lock()
239            .expect("retained sink fault state poisoned")
240    }
241}
242
243/// Public configuration for the lightweight logging runtime.
244#[derive(Debug)]
245pub struct LoggerConfig {
246    /// Stable service name attached to emitted records.
247    pub service_name: ServiceName,
248    /// Root directory that owns the service log tree.
249    pub log_root: PathBuf,
250    /// Minimum severity level emitted by the logger.
251    pub level: LevelFilter,
252    /// Reserved for future async/backpressure implementation. Phase 1 execution is synchronous; this value is stored but not yet applied.
253    pub queue_capacity: usize,
254    /// Rotation settings for the built-in JSONL sink.
255    pub rotation: RotationPolicy,
256    /// Retention settings for rotated JSONL files.
257    pub retention: RetentionPolicy,
258    /// Redaction policy applied before sink fan-out.
259    pub redaction: RedactionPolicy,
260    /// Process identity policy for emitted records.
261    pub process_identity: ProcessIdentityPolicy,
262    /// Whether the built-in JSONL file sink is enabled.
263    pub enable_file_sink: bool,
264    /// Whether the built-in console sink is enabled.
265    pub enable_console_sink: bool,
266}
267
268impl LoggerConfig {
269    /// Builds the documented v1 defaults for a service-scoped logger
270    /// configuration.
271    ///
272    /// If [`constants::SC_LOG_ROOT_ENV_VAR`] is set, it is used only when
273    /// `log_root` is empty. A non-empty `log_root` parameter is treated as
274    /// explicit configuration and takes precedence over the environment helper
275    /// per LOG-009.
276    pub fn default_for(service_name: ServiceName, log_root: PathBuf) -> Self {
277        let resolved_log_root = if log_root.as_os_str().is_empty() {
278            std::env::var(constants::SC_LOG_ROOT_ENV_VAR)
279                .ok()
280                .map(PathBuf::from)
281                .filter(|path| !path.as_os_str().is_empty())
282                .unwrap_or(log_root)
283        } else {
284            log_root
285        };
286        Self {
287            service_name,
288            log_root: resolved_log_root,
289            level: LevelFilter::Info,
290            queue_capacity: constants::DEFAULT_LOG_QUEUE_CAPACITY,
291            rotation: RotationPolicy::default(),
292            retention: RetentionPolicy::default(),
293            redaction: RedactionPolicy {
294                redact_bearer_tokens: true,
295                ..RedactionPolicy::default()
296            },
297            process_identity: ProcessIdentityPolicy::Auto,
298            enable_file_sink: constants::DEFAULT_ENABLE_FILE_SINK,
299            enable_console_sink: constants::DEFAULT_ENABLE_CONSOLE_SINK,
300        }
301    }
302}
303
304struct LoggerRuntime {
305    dropped_events_total: AtomicU64,
306    flush_errors_total: AtomicU64,
307    // MUTEX: sink failures update the shared last_error summary from multiple sinks and call paths;
308    // Mutex is sufficient because reads are rare and the value is replaced atomically as one unit.
309    last_error: Mutex<Option<DiagnosticSummary>>,
310    query_health: Arc<QueryHealthTracker>,
311}
312
313impl LoggerRuntime {
314    fn new(query_available: bool) -> Self {
315        Self {
316            dropped_events_total: AtomicU64::new(0),
317            flush_errors_total: AtomicU64::new(0),
318            last_error: Mutex::new(None),
319            query_health: Arc::new(QueryHealthTracker::new(if query_available {
320                QueryHealthState::Healthy
321            } else {
322                QueryHealthState::Unavailable
323            })),
324        }
325    }
326}
327
328/// Lightweight structured logging runtime with built-in query and follow support.
329#[expect(
330    missing_debug_implementations,
331    reason = "logger owns runtime handles and trait-object sinks whose internal state is not a stable public debug contract"
332)]
333pub struct Logger {
334    config: LoggerConfig,
335    sinks: Vec<SinkRegistration>,
336    shutdown: Arc<AtomicBool>,
337    runtime: LoggerRuntime,
338}
339
340impl Logger {
341    /// Starts a construction-time builder for sink registration.
342    pub fn builder(config: LoggerConfig) -> Result<LoggerBuilder, InitError> {
343        LoggerBuilder::new(config)
344    }
345
346    /// Creates a logger with the configured built-in sinks and runtime state.
347    pub fn new(config: LoggerConfig) -> Result<Self, InitError> {
348        Ok(LoggerBuilder::new(config)?.build())
349    }
350
351    /// Emits one structured log event through the configured sinks.
352    pub fn emit(&self, event: LogEvent) -> Result<(), EventError> {
353        if self.shutdown.load(Ordering::SeqCst) {
354            return Err(EventError(Box::new(ErrorContext::new(
355                error_codes::LOGGER_SHUTDOWN,
356                "logger is shut down",
357                Remediation::not_recoverable("create a new logger before emitting"),
358            ))));
359        }
360
361        validate_event(&event, &self.config.service_name)?;
362        let redacted = self.redact_event(event);
363
364        for registration in &self.sinks {
365            if registration
366                .filter
367                .as_ref()
368                .is_some_and(|filter| !filter.accepts(&redacted))
369            {
370                continue;
371            }
372
373            if let Err(err) = registration.sink.write(&redacted) {
374                self.record_sink_failure(&err);
375            }
376        }
377
378        Ok(())
379    }
380
381    /// Flushes all registered sinks.
382    ///
383    /// Sink flush failures are absorbed into logger health and counters so the
384    /// caller can continue shutdown or health inspection without a secondary
385    /// runtime failure.
386    ///
387    /// # Panics
388    ///
389    /// Panics if an internal sink-health mutex has been poisoned while one of
390    /// the built-in sink implementations is updating its flush state.
391    pub fn flush(&self) -> Result<(), FlushError> {
392        if self.shutdown.load(Ordering::SeqCst) {
393            return Ok(());
394        }
395
396        self.flush_registered_sinks();
397        Ok(())
398    }
399
400    /// Queries the current JSONL log set synchronously using the shared query contract.
401    ///
402    /// # Panics
403    ///
404    /// Panics if the internal query-health mutex has been poisoned while the
405    /// runtime records the result of this query.
406    pub fn query(&self, query: &LogQuery) -> Result<LogSnapshot, QueryError> {
407        let reader = self.query_reader()?;
408        let result = reader.query(query);
409        self.runtime.query_health.record_result(&result);
410        result
411    }
412
413    /// Starts a tail-style follow session from the current end of the visible log set.
414    ///
415    /// # Panics
416    ///
417    /// Panics if the internal query-health mutex has been poisoned while the
418    /// runtime records the result of this follow-start operation.
419    pub fn follow(&self, query: LogQuery) -> Result<LogFollowSession, QueryError> {
420        let active_log_path = self.ensure_query_available()?;
421        let result = LogFollowSession::with_health(
422            active_log_path,
423            query,
424            self.runtime.query_health.clone(),
425            Some(self.shutdown.clone()),
426        );
427        self.runtime.query_health.record_result(&result);
428        result
429    }
430
431    fn flush_registered_sinks(&self) {
432        for registration in &self.sinks {
433            if let Err(err) = registration.sink.flush() {
434                self.record_flush_failure(&err);
435            }
436        }
437    }
438
439    /// Shuts the logger down and makes logger-owned query/follow unavailable.
440    ///
441    /// # Panics
442    ///
443    /// Panics if an internal sink-health mutex or the internal query-health
444    /// mutex has been poisoned while shutdown is flushing sinks and marking
445    /// query/follow unavailable.
446    pub fn shutdown(&self) -> Result<(), ShutdownError> {
447        if self.shutdown.swap(true, Ordering::SeqCst) {
448            return Ok(());
449        }
450
451        self.flush_registered_sinks();
452        self.runtime.query_health.mark_unavailable(None);
453        Ok(())
454    }
455
456    /// Returns aggregate logging and query/follow health for the runtime.
457    ///
458    /// # Panics
459    ///
460    /// Panics if the internal last-error mutex has been poisoned.
461    pub fn health(&self) -> LoggingHealthReport {
462        let sink_statuses: Vec<SinkHealth> =
463            self.sinks.iter().map(|entry| entry.sink.health()).collect();
464        LoggingHealthReport {
465            state: aggregate_logging_health_state(&sink_statuses),
466            dropped_events_total: self.runtime.dropped_events_total.load(Ordering::SeqCst),
467            flush_errors_total: self.runtime.flush_errors_total.load(Ordering::SeqCst),
468            active_log_path: default_log_path(&self.config.log_root, &self.config.service_name),
469            sink_statuses,
470            query: Some(self.runtime.query_health.snapshot()),
471            last_error: self
472                .runtime
473                .last_error
474                .lock()
475                .expect("logger last_error poisoned")
476                .clone(),
477        }
478    }
479
480    fn redact_event(&self, mut event: LogEvent) -> LogEvent {
481        if self.config.redaction.redact_bearer_tokens
482            && let Some(message) = event.message.as_mut()
483        {
484            *message = redact_bearer_token_text(message);
485        }
486
487        for (key, value) in &mut event.fields {
488            if self
489                .config
490                .redaction
491                .denylist_keys
492                .iter()
493                .any(|deny| deny == key)
494            {
495                *value = Value::String(constants::REDACTED_VALUE.to_string());
496            }
497            if self.config.redaction.redact_bearer_tokens {
498                redact_string_value(value);
499            }
500            for redactor in &self.config.redaction.custom_redactors {
501                redactor.redact(key, value);
502            }
503        }
504
505        event
506    }
507
508    fn record_sink_failure(&self, error: &LogSinkError) {
509        self.runtime
510            .dropped_events_total
511            .fetch_add(1, Ordering::SeqCst);
512        *self
513            .runtime
514            .last_error
515            .lock()
516            .expect("logger last_error poisoned") =
517            Some(DiagnosticSummary::from(error.diagnostic()));
518    }
519
520    fn record_flush_failure(&self, error: &LogSinkError) {
521        self.runtime
522            .flush_errors_total
523            .fetch_add(1, Ordering::SeqCst);
524        *self
525            .runtime
526            .last_error
527            .lock()
528            .expect("logger last_error poisoned") =
529            Some(DiagnosticSummary::from(error.diagnostic()));
530    }
531
532    fn query_reader(&self) -> Result<JsonlLogReader, QueryError> {
533        self.ensure_query_available().map(JsonlLogReader::new)
534    }
535
536    fn ensure_query_available(&self) -> Result<PathBuf, QueryError> {
537        if self.shutdown.load(Ordering::SeqCst) {
538            let error = query::shutdown_error();
539            self.runtime.query_health.record_error(&error);
540            return Err(error);
541        }
542
543        if !self.config.enable_file_sink {
544            let error = query::unavailable_error(
545                "logger query/follow requires the built-in JSONL file sink to be enabled",
546            );
547            self.runtime.query_health.record_error(&error);
548            return Err(error);
549        }
550
551        Ok(default_log_path(
552            &self.config.log_root,
553            &self.config.service_name,
554        ))
555    }
556}
557
558/// Built-in JSONL file sink with rotation and retention handling.
559#[expect(
560    missing_debug_implementations,
561    reason = "file-sink internals include mutex-protected runtime state that is intentionally not exposed through a public Debug contract"
562)]
563pub struct JsonlFileSink {
564    path: PathBuf,
565    rotation: RotationPolicy,
566    retention: RetentionPolicy,
567    // MUTEX: sink health mutates as one small shared struct during writes and health reads;
568    // Mutex keeps updates simple and coherent, while RwLock would not reduce contention materially.
569    health: Mutex<SinkHealth>,
570}
571
572impl JsonlFileSink {
573    /// Creates a JSONL file sink at the given active log path.
574    ///
575    /// # Panics
576    ///
577    /// Panics only if the workspace-owned `JSONL_FILE_SINK_NAME` constant ever
578    /// becomes invalid for `SinkName`, which would indicate a programming bug.
579    pub fn new(path: PathBuf, rotation: RotationPolicy, retention: RetentionPolicy) -> Self {
580        Self {
581            path,
582            rotation,
583            retention,
584            health: Mutex::new(SinkHealth {
585                name: SinkName::new(constants::JSONL_FILE_SINK_NAME)
586                    .expect("jsonl sink constant is valid"),
587                state: SinkHealthState::Healthy,
588                last_error: None,
589            }),
590        }
591    }
592
593    /// Returns the active JSONL file path for the sink.
594    pub fn path(&self) -> &Path {
595        &self.path
596    }
597
598    #[expect(
599        clippy::unnecessary_wraps,
600        reason = "the helper preserves a Result-shaped internal API so rotation checks can grow I/O failure propagation without reshaping caller control flow"
601    )]
602    fn rotate_if_needed(&self, incoming_len: u64) -> Result<(), LogSinkError> {
603        if let Ok(metadata) = fs::metadata(&self.path)
604            && metadata.len().saturating_add(incoming_len) > self.rotation.max_bytes
605        {
606            for idx in (1..self.rotation.max_files).rev() {
607                let src = self.rotated_path(idx);
608                let dest = self.rotated_path(idx + 1);
609                // Rotation is best-effort: if a file is absent or another rename wins,
610                // later writes still succeed and the next rotation pass will heal state.
611                let _ = rename_if_present(&src, &dest);
612            }
613            let rotated = self.rotated_path(1);
614            // The active file rename is also best-effort so logging stays fail-open
615            // even when the platform cannot complete the rotation move immediately.
616            let _ = rename_if_present(&self.path, &rotated);
617        }
618
619        self.prune_old_files();
620        Ok(())
621    }
622
623    fn rotated_path(&self, index: u32) -> PathBuf {
624        rotated_log_path(&self.path, index)
625    }
626
627    fn prune_old_files(&self) {
628        let Some(parent) = self.path.parent() else {
629            return;
630        };
631
632        let Ok(entries) = fs::read_dir(parent) else {
633            return;
634        };
635        let retention_cutoff = SystemTime::now()
636            - Duration::from_secs(u64::from(self.retention.max_age_days) * constants::SECS_PER_DAY);
637
638        for entry in entries.flatten() {
639            let path = entry.path();
640            let Some(file_name) = path.file_name().and_then(|value| value.to_str()) else {
641                continue;
642            };
643
644            let active_name = self
645                .path
646                .file_name()
647                .and_then(|value| value.to_str())
648                .unwrap_or_default();
649
650            if !file_name.starts_with(active_name) || file_name == active_name {
651                continue;
652            }
653
654            if let Ok(metadata) = entry.metadata()
655                && let Ok(modified) = metadata.modified()
656                && modified < retention_cutoff
657            {
658                // Retention cleanup is best-effort: failure to delete an old file should not
659                // block new writes or turn routine pruning into a runtime error.
660                let _ = fs::remove_file(path);
661            }
662        }
663    }
664
665    fn mark_failure<E>(&self, error: E) -> LogSinkError
666    where
667        E: std::error::Error + Send + Sync + 'static,
668    {
669        let message = error.to_string();
670        let diagnostic = diagnostic_for_sink_failure(message.clone());
671        let mut health = self.health.lock().expect("file sink health poisoned");
672        health.state = SinkHealthState::DegradedDropping;
673        health.last_error = Some(DiagnosticSummary::from(&diagnostic));
674        LogSinkError(Box::new(
675            ErrorContext::new(
676                error_codes::LOGGER_SINK_WRITE_FAILED,
677                "jsonl file sink write failed",
678                Remediation::not_recoverable(
679                    "file sink write failure handling is owned by the logger runtime",
680                ),
681            )
682            .cause(message)
683            .source(Box::new(error)),
684        ))
685    }
686}
687
688impl LogSink for JsonlFileSink {
689    fn write(&self, event: &LogEvent) -> Result<(), LogSinkError> {
690        if let Some(parent) = self.path.parent() {
691            fs::create_dir_all(parent).map_err(|err| self.mark_failure(err))?;
692        }
693
694        let mut line = serde_json::to_vec(event).map_err(|err| self.mark_failure(err))?;
695        line.push(b'\n');
696        self.rotate_if_needed(line.len() as u64)?;
697
698        let mut file = OpenOptions::new()
699            .create(true)
700            .append(true)
701            .open(&self.path)
702            .map_err(|err| self.mark_failure(err))?;
703        file.write_all(&line)
704            .and_then(|()| file.flush())
705            .map_err(|err| self.mark_failure(err))?;
706
707        let mut health = self.health.lock().expect("file sink health poisoned");
708        health.state = SinkHealthState::Healthy;
709        Ok(())
710    }
711
712    fn health(&self) -> SinkHealth {
713        self.health
714            .lock()
715            .expect("file sink health poisoned")
716            .clone()
717    }
718}
719
720trait ConsoleWriter: Send + Sync {
721    fn write_line(&self, line: &str) -> std::io::Result<()>;
722}
723
724struct StdoutConsoleWriter;
725
726impl ConsoleWriter for StdoutConsoleWriter {
727    fn write_line(&self, line: &str) -> std::io::Result<()> {
728        let mut stdout = std::io::stdout().lock();
729        stdout.write_all(line.as_bytes())?;
730        stdout.write_all(b"\n")?;
731        stdout.flush()
732    }
733}
734
735struct StderrConsoleWriter;
736
737impl ConsoleWriter for StderrConsoleWriter {
738    fn write_line(&self, line: &str) -> std::io::Result<()> {
739        let mut stderr = std::io::stderr().lock();
740        stderr.write_all(line.as_bytes())?;
741        stderr.write_all(b"\n")?;
742        stderr.flush()
743    }
744}
745
746/// Built-in sink that renders log events to a configured output stream
747/// (stdout or stderr).
748#[expect(
749    missing_debug_implementations,
750    reason = "console sink owns a trait-object writer and sink health state, so a derived Debug impl would not be stable or useful"
751)]
752pub struct ConsoleSink {
753    writer: Box<dyn ConsoleWriter>,
754    // MUTEX: console sink health mutates as one shared status struct on write failures and reads;
755    // Mutex is simpler than RwLock because writes dominate and the payload is tiny.
756    health: Mutex<SinkHealth>,
757}
758
759impl ConsoleSink {
760    /// Creates a console sink backed by stdout.
761    pub fn stdout() -> Self {
762        Self::from_writer(Box::new(StdoutConsoleWriter))
763    }
764
765    /// Creates a console sink backed by stderr.
766    pub fn stderr() -> Self {
767        Self::from_writer(Box::new(StderrConsoleWriter))
768    }
769
770    pub(crate) fn from_writer(writer: Box<dyn ConsoleWriter>) -> Self {
771        Self {
772            writer,
773            health: Mutex::new(SinkHealth {
774                name: SinkName::new(constants::CONSOLE_SINK_NAME)
775                    .expect("console sink constant is valid"),
776                state: SinkHealthState::Healthy,
777                last_error: None,
778            }),
779        }
780    }
781
782    fn format_line(event: &LogEvent) -> String {
783        let level = match event.level {
784            Level::Trace => "TRACE",
785            Level::Debug => "DEBUG",
786            Level::Info => "INFO",
787            Level::Warn => "WARN",
788            Level::Error => "ERROR",
789        };
790        let message = event.message.as_deref().unwrap_or("");
791        format!(
792            "{} {} {} {} {}",
793            event.timestamp,
794            level,
795            event.target.as_str(),
796            event.action.as_str(),
797            message
798        )
799    }
800
801    fn mark_failure<E>(&self, error: E) -> LogSinkError
802    where
803        E: std::error::Error + Send + Sync + 'static,
804    {
805        let message = error.to_string();
806        let diagnostic = diagnostic_for_sink_failure(message.clone());
807        let mut health = self.health.lock().expect("console sink health poisoned");
808        health.state = SinkHealthState::DegradedDropping;
809        health.last_error = Some(DiagnosticSummary::from(&diagnostic));
810        LogSinkError(Box::new(
811            ErrorContext::new(
812                error_codes::LOGGER_SINK_WRITE_FAILED,
813                "console sink write failed",
814                Remediation::not_recoverable(
815                    "console sink write failure handling is owned by the logger runtime",
816                ),
817            )
818            .cause(message)
819            .source(Box::new(error)),
820        ))
821    }
822}
823
824impl LogSink for ConsoleSink {
825    fn write(&self, event: &LogEvent) -> Result<(), LogSinkError> {
826        let line = Self::format_line(event);
827        self.writer
828            .write_line(&line)
829            .map_err(|err| self.mark_failure(err))?;
830        let mut health = self.health.lock().expect("console sink health poisoned");
831        health.state = SinkHealthState::Healthy;
832        Ok(())
833    }
834
835    fn health(&self) -> SinkHealth {
836        self.health
837            .lock()
838            .expect("console sink health poisoned")
839            .clone()
840    }
841}
842
843#[cfg(feature = "fault-injection")]
844impl LogSink for FaultInjectingSink {
845    fn write(&self, event: &LogEvent) -> Result<(), LogSinkError> {
846        if let Some(state) = self.current_state() {
847            return Err(LogSinkError(Box::new(fault_injection_error_context(state))));
848        }
849        self.inner.write(event)
850    }
851
852    fn flush(&self) -> Result<(), LogSinkError> {
853        if let Some(state) = self.current_state() {
854            return Err(LogSinkError(Box::new(fault_injection_error_context(state))));
855        }
856        self.inner.flush()
857    }
858
859    fn health(&self) -> SinkHealth {
860        let mut health = self.inner.health();
861        if let Some(state) = self.current_state() {
862            let context = fault_injection_error_context(state);
863            health.state = state;
864            health.last_error = Some(DiagnosticSummary::from(context.diagnostic()));
865        }
866        health
867    }
868}
869
870mod sealed_emitters {
871    pub trait Sealed {}
872}
873
874#[expect(
875    dead_code,
876    reason = "crate-local emitter trait is intentionally available for logging-only injection"
877)]
878pub(crate) trait LogEmitter: sealed_emitters::Sealed + Send + Sync {
879    fn emit_log(&self, event: LogEvent) -> Result<(), EventError>;
880}
881
882impl sealed_emitters::Sealed for Logger {}
883
884impl LogEmitter for Logger {
885    fn emit_log(&self, event: LogEvent) -> Result<(), EventError> {
886        self.emit(event)
887    }
888}
889
890pub(crate) fn diagnostic_for_sink_failure(message: impl Into<String>) -> Diagnostic {
891    Diagnostic {
892        timestamp: Timestamp::now_utc(),
893        code: error_codes::LOGGER_SINK_WRITE_FAILED,
894        message: message.into(),
895        cause: None,
896        remediation: Remediation::not_recoverable(
897            "sink failure handling is owned by the logger runtime",
898        ),
899        docs: None,
900        details: serde_json::Map::new(),
901    }
902}
903
904fn validate_event(event: &LogEvent, expected_service: &ServiceName) -> Result<(), EventError> {
905    if event.version.as_str() != sc_observability_types::constants::OBSERVATION_ENVELOPE_VERSION {
906        return Err(EventError(Box::new(ErrorContext::new(
907            error_codes::LOGGER_INVALID_EVENT,
908            "log event version is invalid",
909            Remediation::recoverable(
910                "emit an observation v1 log event",
911                ["recreate the event with the current contract"],
912            ),
913        ))));
914    }
915
916    if &event.service != expected_service {
917        return Err(EventError(Box::new(ErrorContext::new(
918            error_codes::LOGGER_INVALID_EVENT,
919            "log event service does not match logger service",
920            Remediation::recoverable(
921                "emit the event with the logger service name",
922                ["rebuild the event before emitting"],
923            ),
924        ))));
925    }
926
927    Ok(())
928}
929
930pub(crate) fn default_log_file_name(service_name: &ServiceName) -> String {
931    format!(
932        "{}{}",
933        service_name.as_str(),
934        constants::DEFAULT_LOG_FILE_SUFFIX
935    )
936}
937
938pub(crate) fn default_log_path(log_root: &Path, service_name: &ServiceName) -> PathBuf {
939    log_root
940        .join(constants::DEFAULT_LOG_DIR_NAME)
941        .join(default_log_file_name(service_name))
942}
943
944pub(crate) fn rotated_log_path(active_path: &Path, index: u32) -> PathBuf {
945    let parent = active_path.parent().unwrap_or_else(|| Path::new("."));
946    let file_name = active_path
947        .file_name()
948        .and_then(|value| value.to_str())
949        .unwrap_or("active.log.jsonl");
950    parent.join(format!("{file_name}.{index}"))
951}
952
953fn aggregate_logging_health_state(sink_statuses: &[SinkHealth]) -> LoggingHealthState {
954    if sink_statuses
955        .iter()
956        .any(|sink| sink.state == SinkHealthState::Unavailable)
957    {
958        LoggingHealthState::Unavailable
959    } else if sink_statuses
960        .iter()
961        .any(|sink| sink.state != SinkHealthState::Healthy)
962    {
963        LoggingHealthState::DegradedDropping
964    } else {
965        LoggingHealthState::Healthy
966    }
967}
968
969#[cfg(feature = "fault-injection")]
970fn fault_injection_error_context(state: SinkHealthState) -> ErrorContext {
971    let forced_state = match state {
972        SinkHealthState::Healthy => "healthy",
973        SinkHealthState::DegradedDropping => "degraded_dropping",
974        SinkHealthState::Unavailable => "unavailable",
975    };
976    ErrorContext::new(
977        error_codes::LOGGER_SINK_FAULT_INJECTED,
978        format!("retained sink fault injection forced {forced_state} state"),
979        Remediation::recoverable(
980            "clear the retained sink fault injector before resuming normal validation traffic",
981            ["clear the injector state"],
982        ),
983    )
984    .detail("forced_state", Value::String(forced_state.to_string()))
985}
986
987fn rename_if_present(src: &Path, dest: &Path) -> std::io::Result<()> {
988    match fs::rename(src, dest) {
989        Ok(()) => Ok(()),
990        Err(err) if err.kind() == std::io::ErrorKind::NotFound => Ok(()),
991        Err(err) => Err(err),
992    }
993}
994
995fn redact_string_value(value: &mut Value) {
996    if let Value::String(text) = value {
997        *text = redact_bearer_token_text(text);
998    }
999}
1000
1001fn redact_bearer_token_text(input: &str) -> String {
1002    const PREFIX: &str = "Bearer ";
1003    let mut result = String::with_capacity(input.len());
1004    let mut remaining = input;
1005
1006    while let Some(index) = remaining.find(PREFIX) {
1007        result.push_str(&remaining[..index + PREFIX.len()]);
1008        let token_start = index + PREFIX.len();
1009        let token_end = remaining[token_start..]
1010            .find(char::is_whitespace)
1011            .map_or(remaining.len(), |value| token_start + value);
1012        result.push_str(constants::REDACTED_VALUE);
1013        remaining = &remaining[token_end..];
1014    }
1015
1016    result.push_str(remaining);
1017    result
1018}
1019
1020#[cfg(test)]
1021mod tests {
1022    use super::*;
1023    use sc_observability_types::{
1024        ActionName, Diagnostic, ErrorCode, Level, LogEvent, LogOrder, LogQuery, LogSnapshot,
1025        ProcessIdentity, QueryError, QueryHealthState, TargetCategory, Timestamp,
1026    };
1027    use serde_json::{Map, json};
1028    use std::sync::Arc;
1029    use temp_env::{with_var, with_var_unset};
1030
1031    struct SharedBuffer {
1032        lines: Arc<Mutex<Vec<String>>>,
1033    }
1034
1035    impl ConsoleWriter for SharedBuffer {
1036        fn write_line(&self, line: &str) -> std::io::Result<()> {
1037            self.lines
1038                .lock()
1039                .expect("buffer poisoned")
1040                .push(line.to_string());
1041            Ok(())
1042        }
1043    }
1044
1045    struct PrefixRedactor;
1046
1047    impl Redactor for PrefixRedactor {
1048        fn redact(&self, key: &str, value: &mut Value) {
1049            if key == "secret" {
1050                *value = Value::String("custom-redacted".to_string());
1051            }
1052        }
1053    }
1054
1055    struct FailSink;
1056
1057    impl LogSink for FailSink {
1058        fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1059            Err(LogSinkError(Box::new(ErrorContext::new(
1060                error_codes::LOGGER_SINK_WRITE_FAILED,
1061                "fail sink write failed",
1062                Remediation::not_recoverable("test sink intentionally fails"),
1063            ))))
1064        }
1065
1066        fn health(&self) -> SinkHealth {
1067            SinkHealth {
1068                name: sink_name("fail"),
1069                state: SinkHealthState::DegradedDropping,
1070                last_error: None,
1071            }
1072        }
1073    }
1074
1075    #[derive(Default)]
1076    struct RecordingFlushSink {
1077        flush_calls: AtomicU64,
1078    }
1079
1080    impl LogSink for RecordingFlushSink {
1081        fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1082            Ok(())
1083        }
1084
1085        fn flush(&self) -> Result<(), LogSinkError> {
1086            self.flush_calls.fetch_add(1, Ordering::SeqCst);
1087            Ok(())
1088        }
1089
1090        fn health(&self) -> SinkHealth {
1091            SinkHealth {
1092                name: sink_name("recording-flush"),
1093                state: SinkHealthState::Healthy,
1094                last_error: None,
1095            }
1096        }
1097    }
1098
1099    fn service_name() -> ServiceName {
1100        ServiceName::new("sc-observability").expect("valid service name")
1101    }
1102
1103    fn schema_version() -> sc_observability_types::SchemaVersion {
1104        sc_observability_types::SchemaVersion::new(
1105            sc_observability_types::constants::OBSERVATION_ENVELOPE_VERSION,
1106        )
1107        .expect("valid schema version")
1108    }
1109
1110    fn outcome_label(value: &str) -> sc_observability_types::OutcomeLabel {
1111        sc_observability_types::OutcomeLabel::new(value).expect("valid outcome label")
1112    }
1113
1114    fn correlation_id(value: &str) -> sc_observability_types::CorrelationId {
1115        sc_observability_types::CorrelationId::new(value).expect("valid correlation id")
1116    }
1117
1118    fn sink_name(value: &str) -> SinkName {
1119        SinkName::new(value).expect("valid sink name")
1120    }
1121
1122    fn temp_path(name: &str) -> PathBuf {
1123        let path = std::env::temp_dir().join(format!(
1124            "sc-observability-{name}-{}-{}",
1125            std::process::id(),
1126            SystemTime::now()
1127                .duration_since(SystemTime::UNIX_EPOCH)
1128                .expect("system time before unix epoch")
1129                .as_nanos()
1130        ));
1131        let _ = fs::remove_dir_all(&path);
1132        path
1133    }
1134
1135    #[cfg(unix)]
1136    fn unix_file_identity(path: &Path) -> crate::query::FileIdentity {
1137        crate::query::file_identity_for_path(path)
1138    }
1139
1140    #[cfg(unix)]
1141    fn recreate_with_distinct_unix_identity(active_path: &Path) {
1142        let previous_identity = unix_file_identity(active_path);
1143        fs::remove_file(active_path).expect("remove active log");
1144
1145        let active_name = active_path
1146            .file_name()
1147            .and_then(|value| value.to_str())
1148            .expect("active log file name");
1149
1150        for attempt in 0..256 {
1151            let replacement =
1152                active_path.with_file_name(format!("{active_name}.replacement-{attempt}"));
1153            fs::File::create(&replacement).expect("create replacement file");
1154            if unix_file_identity(&replacement) != previous_identity {
1155                fs::rename(&replacement, active_path).expect("install replacement active log");
1156                return;
1157            }
1158            fs::remove_file(&replacement).expect("remove reused replacement inode");
1159        }
1160
1161        panic!("failed to create replacement active log with distinct Unix identity");
1162    }
1163
1164    #[cfg(not(unix))]
1165    fn recreate_with_distinct_unix_identity(active_path: &Path) {
1166        // Non-Unix follow tests only verify that truncate/recreate remains
1167        // callable. Identity-distinctness is intentionally not asserted here,
1168        // and both cfg variants must stay behaviorally aligned when this helper
1169        // changes.
1170        fs::remove_file(active_path).expect("remove active log");
1171        fs::File::create(active_path).expect("recreate active log");
1172    }
1173
1174    fn with_sc_log_root<T>(value: Option<&Path>, f: impl FnOnce() -> T) -> T {
1175        match value {
1176            Some(path) => with_var(constants::SC_LOG_ROOT_ENV_VAR, Some(path), f),
1177            None => with_var_unset(constants::SC_LOG_ROOT_ENV_VAR, f),
1178        }
1179    }
1180
1181    fn log_event(service_name: ServiceName) -> LogEvent {
1182        LogEvent {
1183            version: schema_version(),
1184            timestamp: Timestamp::UNIX_EPOCH,
1185            level: Level::Info,
1186            service: service_name,
1187            target: TargetCategory::new("logger.core").expect("valid target"),
1188            action: ActionName::new("emit").expect("valid action"),
1189            message: Some("Authorization: Bearer abc123".to_string()),
1190            identity: ProcessIdentity::default(),
1191            trace: None,
1192            request_id: None,
1193            correlation_id: None,
1194            outcome: Some(outcome_label("ok")),
1195            diagnostic: Some(Diagnostic {
1196                timestamp: Timestamp::UNIX_EPOCH,
1197                code: ErrorCode::new_static("SC_TEST"),
1198                message: "diagnostic".to_string(),
1199                cause: None,
1200                remediation: Remediation::recoverable("retry", ["inspect logs"]),
1201                docs: None,
1202                details: Map::default(),
1203            }),
1204            state_transition: None,
1205            fields: serde_json::Map::from_iter([
1206                ("token".to_string(), json!("Bearer secret")),
1207                ("secret".to_string(), json!("raw")),
1208            ]),
1209        }
1210    }
1211
1212    fn log_event_with_request(
1213        service_name: ServiceName,
1214        request_id: &str,
1215        message_padding: usize,
1216    ) -> LogEvent {
1217        let mut event = log_event(service_name);
1218        event.message = Some(format!("{request_id} {}", "x".repeat(message_padding)));
1219        event.request_id = Some(correlation_id(request_id));
1220        event
1221            .fields
1222            .insert("sequence".to_string(), json!(request_id.to_string()));
1223        event
1224    }
1225
1226    fn query_all(order: LogOrder) -> LogQuery {
1227        LogQuery {
1228            order,
1229            ..LogQuery::default()
1230        }
1231    }
1232
1233    fn request_ids(snapshot: &LogSnapshot) -> Vec<String> {
1234        snapshot
1235            .events
1236            .iter()
1237            .map(|event| event.request_id.clone().expect("request_id").to_string())
1238            .collect()
1239    }
1240
1241    fn drain_follow_until_request_id(
1242        follow: &mut LogFollowSession,
1243        expected_request_id: &str,
1244    ) -> Vec<String> {
1245        let mut drained = Vec::new();
1246        for _ in 0..10 {
1247            let snapshot = follow.poll().expect("follow poll");
1248            drained.extend(request_ids(&snapshot));
1249            if drained
1250                .iter()
1251                .any(|request_id| request_id == expected_request_id)
1252            {
1253                return drained;
1254            }
1255        }
1256
1257        panic!("follow session never yielded {expected_request_id}");
1258    }
1259
1260    #[test]
1261    fn logger_config_default_for_sets_documented_defaults() {
1262        let root = temp_path("defaults");
1263        let config = LoggerConfig::default_for(service_name(), root.clone());
1264        assert_eq!(config.level, LevelFilter::Info);
1265        assert_eq!(config.queue_capacity, constants::DEFAULT_LOG_QUEUE_CAPACITY);
1266        assert_eq!(
1267            config.rotation.max_bytes,
1268            constants::DEFAULT_ROTATION_MAX_BYTES
1269        );
1270        assert_eq!(
1271            config.rotation.max_files,
1272            constants::DEFAULT_ROTATION_MAX_FILES
1273        );
1274        assert_eq!(
1275            config.retention.max_age_days,
1276            constants::DEFAULT_RETENTION_MAX_AGE_DAYS
1277        );
1278        assert!(config.enable_file_sink);
1279        assert!(!config.enable_console_sink);
1280        assert_eq!(
1281            default_log_path(&root, &config.service_name),
1282            root.join(constants::DEFAULT_LOG_DIR_NAME)
1283                .join(default_log_file_name(&config.service_name))
1284        );
1285    }
1286
1287    #[test]
1288    fn logger_config_default_for_uses_sc_log_root_when_log_root_is_empty() {
1289        let env_root = temp_path("env-root");
1290
1291        with_sc_log_root(Some(&env_root), || {
1292            let config = LoggerConfig::default_for(service_name(), PathBuf::new());
1293            assert_eq!(config.log_root, env_root);
1294        });
1295    }
1296
1297    #[test]
1298    fn logger_config_default_for_prefers_explicit_log_root_over_env() {
1299        let env_root = temp_path("env-root-override");
1300        let explicit_root = temp_path("explicit-root");
1301
1302        with_sc_log_root(Some(&env_root), || {
1303            let config = LoggerConfig::default_for(service_name(), explicit_root.clone());
1304            assert_eq!(config.log_root, explicit_root);
1305        });
1306    }
1307
1308    #[test]
1309    fn logger_config_debug_renders_redaction_summary() {
1310        let root = temp_path("debug");
1311        let config = LoggerConfig::default_for(service_name(), root);
1312
1313        let rendered = format!("{config:?}");
1314
1315        assert!(rendered.contains("LoggerConfig"));
1316        assert!(rendered.contains("RedactionPolicy"));
1317        assert!(rendered.contains("custom_redactors: 0"));
1318    }
1319
1320    #[test]
1321    fn file_only_logging_writes_jsonl_to_default_path() {
1322        let root = temp_path("file-only");
1323        let config = LoggerConfig::default_for(service_name(), root.clone());
1324        let logger = Logger::new(config).expect("logger");
1325        logger.emit(log_event(service_name())).expect("emit");
1326
1327        let path = default_log_path(&root, &service_name());
1328        let contents = fs::read_to_string(&path).expect("read log file");
1329        assert!(contents.contains("\"level\":\"Info\""));
1330        assert!(contents.contains("[REDACTED]"));
1331    }
1332
1333    #[test]
1334    fn file_and_console_fan_out_both_receive_event() {
1335        let root = temp_path("fanout");
1336        let mut config = LoggerConfig::default_for(service_name(), root.clone());
1337        config.enable_console_sink = false;
1338        let mut builder = Logger::builder(config).expect("logger builder");
1339
1340        let lines = Arc::new(Mutex::new(Vec::<String>::new()));
1341        builder.register_sink(SinkRegistration::new(Arc::new(ConsoleSink::from_writer(
1342            Box::new(SharedBuffer {
1343                lines: lines.clone(),
1344            }),
1345        ))));
1346        let logger = builder.build();
1347
1348        logger.emit(log_event(service_name())).expect("emit");
1349
1350        let path = default_log_path(&root, &service_name());
1351        assert!(path.exists());
1352        let lines = lines.lock().expect("lines poisoned");
1353        assert_eq!(lines.len(), 1);
1354        assert!(lines[0].contains("logger.core"));
1355    }
1356
1357    #[test]
1358    fn redaction_runs_before_sink_fan_out() {
1359        let root = temp_path("redaction");
1360        let mut config = LoggerConfig::default_for(service_name(), root.clone());
1361        config.enable_console_sink = false;
1362        config.redaction.denylist_keys.push("token".to_string());
1363        config
1364            .redaction
1365            .custom_redactors
1366            .push(Box::new(PrefixRedactor));
1367        let mut builder = Logger::builder(config).expect("logger builder");
1368
1369        let lines = Arc::new(Mutex::new(Vec::<String>::new()));
1370        builder.register_sink(SinkRegistration::new(Arc::new(ConsoleSink::from_writer(
1371            Box::new(SharedBuffer {
1372                lines: lines.clone(),
1373            }),
1374        ))));
1375        let logger = builder.build();
1376
1377        logger.emit(log_event(service_name())).expect("emit");
1378
1379        let file_path = default_log_path(&root, &service_name());
1380        let file_contents = fs::read_to_string(file_path).expect("read file");
1381        let console_line = lines.lock().expect("lines poisoned")[0].clone();
1382        assert!(file_contents.contains("[REDACTED]"));
1383        assert!(file_contents.contains("custom-redacted"));
1384        assert!(console_line.contains("[REDACTED]"));
1385    }
1386
1387    #[test]
1388    fn invalid_event_returns_event_error() {
1389        let root = temp_path("invalid");
1390        let config = LoggerConfig::default_for(service_name(), root);
1391        let logger = Logger::new(config).expect("logger");
1392        let mut event = log_event(service_name());
1393        event.version = sc_observability_types::SchemaVersion::new("v0").expect("valid version");
1394        assert!(logger.emit(event).is_err());
1395    }
1396
1397    #[test]
1398    fn sink_failures_are_fail_open_and_counted_in_health() {
1399        let root = temp_path("fail-open");
1400        let mut config = LoggerConfig::default_for(service_name(), root);
1401        config.enable_file_sink = false;
1402        let mut builder = Logger::builder(config).expect("logger builder");
1403        builder.register_sink(SinkRegistration::new(Arc::new(FailSink)));
1404        let logger = builder.build();
1405
1406        logger
1407            .emit(log_event(service_name()))
1408            .expect("emit still succeeds");
1409
1410        let health = logger.health();
1411        assert_eq!(health.state, LoggingHealthState::DegradedDropping);
1412        assert_eq!(health.dropped_events_total, 1);
1413        assert!(health.last_error.is_some());
1414    }
1415
1416    #[test]
1417    fn flush_failures_are_fail_open_and_counted_in_health() {
1418        struct FlushFailSink;
1419
1420        impl LogSink for FlushFailSink {
1421            fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1422                Ok(())
1423            }
1424
1425            fn flush(&self) -> Result<(), LogSinkError> {
1426                Err(LogSinkError(Box::new(ErrorContext::new(
1427                    error_codes::LOGGER_FLUSH_FAILED,
1428                    "flush failed",
1429                    Remediation::not_recoverable("test sink intentionally fails flush"),
1430                ))))
1431            }
1432
1433            fn health(&self) -> SinkHealth {
1434                SinkHealth {
1435                    name: sink_name("flush-fail"),
1436                    state: SinkHealthState::DegradedDropping,
1437                    last_error: None,
1438                }
1439            }
1440        }
1441
1442        let root = temp_path("flush-fail");
1443        let mut config = LoggerConfig::default_for(service_name(), root);
1444        config.enable_file_sink = false;
1445        let mut builder = Logger::builder(config).expect("logger builder");
1446        builder.register_sink(SinkRegistration::new(Arc::new(FlushFailSink)));
1447        let logger = builder.build();
1448
1449        logger.flush().expect("flush remains fail-open");
1450
1451        let health = logger.health();
1452        assert_eq!(health.dropped_events_total, 0);
1453        assert_eq!(health.flush_errors_total, 1);
1454        assert!(health.last_error.is_some());
1455    }
1456
1457    #[test]
1458    fn default_log_path_uses_service_scoped_layout() {
1459        let service = ServiceName::new("custom-service").expect("valid service");
1460        let log_root = PathBuf::from("observability-root");
1461
1462        let path = default_log_path(&log_root, &service);
1463
1464        assert_eq!(
1465            path,
1466            PathBuf::from("observability-root/logs/custom-service.log.jsonl")
1467        );
1468    }
1469
1470    #[test]
1471    fn rotated_log_paths_keep_the_active_filename_prefix() {
1472        let sink = JsonlFileSink::new(
1473            PathBuf::from("observability-root/logs/custom-service.log.jsonl"),
1474            RotationPolicy::default(),
1475            RetentionPolicy::default(),
1476        );
1477
1478        assert_eq!(
1479            sink.rotated_path(1),
1480            PathBuf::from("observability-root/logs/custom-service.log.jsonl.1")
1481        );
1482        assert_eq!(
1483            sink.rotated_path(2),
1484            PathBuf::from("observability-root/logs/custom-service.log.jsonl.2")
1485        );
1486    }
1487
1488    #[test]
1489    fn console_sink_stderr_constructor_is_operational() {
1490        let sink = ConsoleSink::stderr();
1491
1492        sink.write(&log_event(service_name()))
1493            .expect("stderr write");
1494
1495        assert_eq!(sink.health().state, SinkHealthState::Healthy);
1496    }
1497
1498    #[cfg(feature = "fault-injection")]
1499    #[test]
1500    fn retained_sink_fault_injector_forces_degraded_logging_health() {
1501        let root = temp_path("fault-degraded");
1502        let mut config = LoggerConfig::default_for(service_name(), root);
1503        config.enable_file_sink = false;
1504        let mut builder = Logger::builder(config).expect("logger builder");
1505        let injector = RetainedSinkFaultInjector::new();
1506        builder.register_sink(SinkRegistration::new(
1507            injector.wrap(Arc::new(RecordingFlushSink::default())),
1508        ));
1509        let logger = builder.build();
1510
1511        injector.force_degraded();
1512        logger
1513            .emit(log_event(service_name()))
1514            .expect("emit remains fail-open");
1515
1516        let health = logger.health();
1517        assert_eq!(health.state, LoggingHealthState::DegradedDropping);
1518        assert_eq!(
1519            health.sink_statuses[0].state,
1520            SinkHealthState::DegradedDropping
1521        );
1522        assert_eq!(health.dropped_events_total, 1);
1523        assert!(health.last_error.is_some());
1524    }
1525
1526    #[cfg(feature = "fault-injection")]
1527    #[test]
1528    fn retained_sink_fault_injector_forces_unavailable_logging_health() {
1529        let root = temp_path("fault-unavailable");
1530        let mut config = LoggerConfig::default_for(service_name(), root);
1531        config.enable_file_sink = false;
1532        let mut builder = Logger::builder(config).expect("logger builder");
1533        let injector = RetainedSinkFaultInjector::new();
1534        builder.register_sink(SinkRegistration::new(
1535            injector.wrap(Arc::new(RecordingFlushSink::default())),
1536        ));
1537        let logger = builder.build();
1538
1539        injector.force_unavailable();
1540        logger
1541            .emit(log_event(service_name()))
1542            .expect("emit remains fail-open");
1543
1544        let health = logger.health();
1545        assert_eq!(health.state, LoggingHealthState::Unavailable);
1546        assert_eq!(health.sink_statuses[0].state, SinkHealthState::Unavailable);
1547        assert_eq!(health.dropped_events_total, 1);
1548        assert!(health.last_error.is_some());
1549    }
1550
1551    #[test]
1552    fn sink_filter_blocks_event_delivery() {
1553        struct DenyAll;
1554
1555        impl LogFilter for DenyAll {
1556            fn accepts(&self, _event: &LogEvent) -> bool {
1557                false
1558            }
1559        }
1560
1561        let root = temp_path("filter");
1562        let mut config = LoggerConfig::default_for(service_name(), root);
1563        config.enable_file_sink = false;
1564        let mut builder = Logger::builder(config).expect("logger builder");
1565
1566        let lines = Arc::new(Mutex::new(Vec::<String>::new()));
1567        builder.register_sink(
1568            SinkRegistration::new(Arc::new(ConsoleSink::from_writer(Box::new(SharedBuffer {
1569                lines: lines.clone(),
1570            }))))
1571            .with_filter(Arc::new(DenyAll)),
1572        );
1573        let logger = builder.build();
1574
1575        logger.emit(log_event(service_name())).expect("emit");
1576
1577        assert!(lines.lock().expect("lines poisoned").is_empty());
1578    }
1579
1580    #[test]
1581    fn shutdown_blocks_future_emits() {
1582        let root = temp_path("shutdown");
1583        let config = LoggerConfig::default_for(service_name(), root);
1584        let logger = Logger::new(config).expect("logger");
1585        logger.shutdown().expect("shutdown");
1586        assert!(logger.emit(log_event(service_name())).is_err());
1587        assert!(logger.flush().is_ok());
1588        assert!(logger.shutdown().is_ok());
1589    }
1590
1591    #[test]
1592    fn shutdown_flushes_registered_sinks_before_marking_shutdown() {
1593        let root = temp_path("shutdown-flush");
1594        let mut config = LoggerConfig::default_for(service_name(), root);
1595        config.enable_file_sink = false;
1596        let mut builder = Logger::builder(config).expect("logger builder");
1597        let sink = Arc::new(RecordingFlushSink::default());
1598        builder.register_sink(SinkRegistration::new(sink.clone()));
1599        let logger = builder.build();
1600
1601        logger.shutdown().expect("shutdown");
1602
1603        assert_eq!(sink.flush_calls.load(Ordering::SeqCst), 1);
1604    }
1605
1606    #[test]
1607    fn historical_query_reads_active_and_rotated_files() {
1608        let root = temp_path("query-rotated");
1609        let mut config = LoggerConfig::default_for(service_name(), root.clone());
1610        config.rotation.max_bytes = 350;
1611        config.rotation.max_files = 4;
1612        let logger = Logger::new(config).expect("logger");
1613
1614        for request_id in ["req-1", "req-2", "req-3"] {
1615            logger
1616                .emit(log_event_with_request(service_name(), request_id, 240))
1617                .expect("emit");
1618        }
1619
1620        let active_path = default_log_path(&root, &service_name());
1621        let resolved_paths = crate::query::query_active_and_rotated_paths(&active_path, 4);
1622        assert!(
1623            resolved_paths
1624                .iter()
1625                .any(|path| path.ends_with("sc-observability.log.jsonl.1"))
1626        );
1627        assert!(
1628            resolved_paths
1629                .iter()
1630                .any(|path| path.ends_with("sc-observability.log.jsonl"))
1631        );
1632
1633        let asc = logger
1634            .query(&query_all(LogOrder::OldestFirst))
1635            .expect("asc query");
1636        assert_eq!(request_ids(&asc), ["req-1", "req-2", "req-3"]);
1637
1638        let desc = logger
1639            .query(&LogQuery {
1640                order: LogOrder::NewestFirst,
1641                limit: Some(2),
1642                ..LogQuery::default()
1643            })
1644            .expect("desc query");
1645        assert_eq!(request_ids(&desc), ["req-3", "req-2"]);
1646        assert!(desc.truncated);
1647    }
1648
1649    #[test]
1650    fn historical_query_preserves_order_across_multiple_rotated_files() {
1651        let root = temp_path("query-multi-rotation-order");
1652        let mut config = LoggerConfig::default_for(service_name(), root.clone());
1653        config.rotation.max_bytes = 350;
1654        config.rotation.max_files = 6;
1655        let logger = Logger::new(config).expect("logger");
1656
1657        for request_id in ["req-1", "req-2", "req-3", "req-4", "req-5"] {
1658            logger
1659                .emit(log_event_with_request(service_name(), request_id, 220))
1660                .expect("emit");
1661        }
1662
1663        let oldest_first = logger
1664            .query(&query_all(LogOrder::OldestFirst))
1665            .expect("oldest-first query");
1666        assert_eq!(
1667            request_ids(&oldest_first),
1668            ["req-1", "req-2", "req-3", "req-4", "req-5"]
1669        );
1670
1671        let newest_first = logger
1672            .query(&LogQuery {
1673                order: LogOrder::NewestFirst,
1674                ..LogQuery::default()
1675            })
1676            .expect("newest-first query");
1677        assert_eq!(
1678            request_ids(&newest_first),
1679            ["req-5", "req-4", "req-3", "req-2", "req-1"]
1680        );
1681    }
1682
1683    #[test]
1684    fn logger_and_jsonl_reader_query_have_parity() {
1685        let root = temp_path("query-parity");
1686        let mut config = LoggerConfig::default_for(service_name(), root.clone());
1687        config.rotation.max_bytes = 350;
1688        config.rotation.max_files = 4;
1689        let logger = Logger::new(config).expect("logger");
1690
1691        for request_id in ["req-a", "req-b", "req-c"] {
1692            logger
1693                .emit(log_event_with_request(service_name(), request_id, 220))
1694                .expect("emit");
1695        }
1696
1697        let query = LogQuery {
1698            order: LogOrder::NewestFirst,
1699            limit: Some(2),
1700            ..LogQuery::default()
1701        };
1702        let logger_snapshot = logger.query(&query).expect("logger query");
1703        let reader = JsonlLogReader::new(default_log_path(&root, &service_name()));
1704        let reader_snapshot = reader.query(&query).expect("reader query");
1705
1706        assert_eq!(reader_snapshot, logger_snapshot);
1707    }
1708
1709    #[test]
1710    fn follow_starts_at_tail_and_survives_multiple_rotations() {
1711        let root = temp_path("follow-rotation");
1712        let mut config = LoggerConfig::default_for(service_name(), root);
1713        config.rotation.max_bytes = 350;
1714        config.rotation.max_files = 6;
1715        let logger = Logger::new(config).expect("logger");
1716
1717        logger
1718            .emit(log_event_with_request(service_name(), "backlog", 220))
1719            .expect("emit backlog");
1720
1721        let mut follow = logger
1722            .follow(query_all(LogOrder::OldestFirst))
1723            .expect("follow");
1724        assert!(follow.poll().expect("initial poll").events.is_empty());
1725
1726        for request_id in ["fresh-1", "fresh-2", "fresh-3"] {
1727            logger
1728                .emit(log_event_with_request(service_name(), request_id, 220))
1729                .expect("emit fresh");
1730        }
1731
1732        let snapshot = follow.poll().expect("follow poll");
1733        assert_eq!(request_ids(&snapshot), ["fresh-1", "fresh-2", "fresh-3"]);
1734        assert_eq!(follow.health().state, QueryHealthState::Healthy);
1735    }
1736
1737    #[test]
1738    fn logger_and_jsonl_reader_follow_have_parity() {
1739        let root = temp_path("follow-parity");
1740        let mut config = LoggerConfig::default_for(service_name(), root.clone());
1741        config.rotation.max_bytes = 350;
1742        config.rotation.max_files = 6;
1743        let logger = Logger::new(config).expect("logger");
1744
1745        logger
1746            .emit(log_event_with_request(service_name(), "backlog", 220))
1747            .expect("emit backlog");
1748
1749        let query = query_all(LogOrder::OldestFirst);
1750        let mut logger_follow = logger.follow(query.clone()).expect("logger follow");
1751        let reader = JsonlLogReader::new(default_log_path(&root, &service_name()));
1752        let mut reader_follow = reader.follow(query).expect("reader follow");
1753
1754        for request_id in ["reader-1", "reader-2"] {
1755            logger
1756                .emit(log_event_with_request(service_name(), request_id, 220))
1757                .expect("emit fresh");
1758        }
1759
1760        assert_eq!(
1761            logger_follow.poll().expect("logger follow poll"),
1762            reader_follow.poll().expect("reader follow poll")
1763        );
1764    }
1765
1766    #[test]
1767    fn query_health_tracks_decode_and_shutdown_failures() {
1768        use std::io::Write as _;
1769
1770        let root = temp_path("query-health");
1771        let config = LoggerConfig::default_for(service_name(), root.clone());
1772        let logger = Logger::new(config).expect("logger");
1773
1774        logger
1775            .emit(log_event_with_request(service_name(), "healthy", 20))
1776            .expect("emit");
1777
1778        let active_path = default_log_path(&root, &service_name());
1779        let mut file = OpenOptions::new()
1780            .append(true)
1781            .open(&active_path)
1782            .expect("open active log");
1783        writeln!(file, "{{not-json").expect("append malformed json");
1784
1785        let decode_error = logger
1786            .query(&query_all(LogOrder::OldestFirst))
1787            .expect_err("decode error");
1788        assert!(matches!(decode_error, QueryError::Decode(_)));
1789        let degraded_health = logger.health().query.expect("query health");
1790        assert_eq!(degraded_health.state, QueryHealthState::Degraded);
1791        assert!(degraded_health.last_error.is_some());
1792
1793        logger.shutdown().expect("shutdown");
1794        let shutdown_error = logger
1795            .query(&query_all(LogOrder::OldestFirst))
1796            .expect_err("shutdown error");
1797        assert!(matches!(shutdown_error, QueryError::Shutdown));
1798        assert_eq!(
1799            logger.health().query.expect("query health").state,
1800            QueryHealthState::Unavailable
1801        );
1802        assert!(matches!(
1803            logger.follow(query_all(LogOrder::OldestFirst)),
1804            Err(QueryError::Shutdown)
1805        ));
1806    }
1807
1808    #[test]
1809    fn logger_query_returns_shutdown_variant_after_shutdown() {
1810        let root = temp_path("query-shutdown-variant");
1811        let config = LoggerConfig::default_for(service_name(), root);
1812        let logger = Logger::new(config).expect("logger");
1813
1814        logger.shutdown().expect("shutdown");
1815
1816        let error = logger
1817            .query(&query_all(LogOrder::OldestFirst))
1818            .expect_err("shutdown error");
1819        assert!(matches!(error, QueryError::Shutdown));
1820    }
1821
1822    #[test]
1823    fn logger_follow_session_becomes_unavailable_after_shutdown() {
1824        let root = temp_path("follow-shutdown");
1825        let config = LoggerConfig::default_for(service_name(), root);
1826        let logger = Logger::new(config).expect("logger");
1827
1828        let mut follow = logger
1829            .follow(query_all(LogOrder::OldestFirst))
1830            .expect("follow");
1831        assert!(follow.poll().expect("initial poll").events.is_empty());
1832
1833        logger.shutdown().expect("shutdown");
1834
1835        assert!(matches!(follow.poll(), Err(QueryError::Shutdown)));
1836        assert_eq!(follow.health().state, QueryHealthState::Unavailable);
1837    }
1838
1839    #[test]
1840    fn logger_query_and_follow_reject_invalid_queries() {
1841        let root = temp_path("invalid-query");
1842        let config = LoggerConfig::default_for(service_name(), root);
1843        let logger = Logger::new(config).expect("logger");
1844
1845        let invalid_limit = LogQuery {
1846            limit: Some(0),
1847            ..LogQuery::default()
1848        };
1849        let invalid_range = LogQuery {
1850            since: Some(Timestamp::now_utc()),
1851            until: Some(Timestamp::UNIX_EPOCH),
1852            ..LogQuery::default()
1853        };
1854
1855        assert!(matches!(
1856            logger.query(&invalid_limit),
1857            Err(QueryError::InvalidQuery(_))
1858        ));
1859        assert!(matches!(
1860            logger.follow(invalid_limit),
1861            Err(QueryError::InvalidQuery(_))
1862        ));
1863        assert!(matches!(
1864            logger.query(&invalid_range),
1865            Err(QueryError::InvalidQuery(_))
1866        ));
1867        assert!(matches!(
1868            logger.follow(invalid_range),
1869            Err(QueryError::InvalidQuery(_))
1870        ));
1871    }
1872
1873    #[test]
1874    fn query_and_follow_are_unavailable_without_file_sink() {
1875        let root = temp_path("query-unavailable");
1876        let mut config = LoggerConfig::default_for(service_name(), root);
1877        config.enable_file_sink = false;
1878        let logger = Logger::new(config).expect("logger");
1879
1880        assert!(matches!(
1881            logger.query(&query_all(LogOrder::OldestFirst)),
1882            Err(QueryError::Unavailable(_))
1883        ));
1884        assert!(matches!(
1885            logger.follow(query_all(LogOrder::OldestFirst)),
1886            Err(QueryError::Unavailable(_))
1887        ));
1888        assert_eq!(
1889            logger.health().query.expect("query health").state,
1890            QueryHealthState::Unavailable
1891        );
1892    }
1893
1894    // On Windows, the non-Unix file-identity approach (len + modified_nanos) cannot
1895    // distinguish appends from file recreation because every write changes `len`.
1896    // Follow truncation/recreation semantics are tested on Unix and macOS.
1897    #[cfg_attr(windows, ignore)]
1898    #[test]
1899    fn follow_recovers_after_active_file_truncate_and_recreate() {
1900        let root = temp_path("follow-truncate-recreate");
1901        let config = LoggerConfig::default_for(service_name(), root.clone());
1902        let logger = Logger::new(config).expect("logger");
1903
1904        logger
1905            .emit(log_event_with_request(service_name(), "backlog", 20))
1906            .expect("emit backlog");
1907
1908        let mut follow = logger
1909            .follow(query_all(LogOrder::OldestFirst))
1910            .expect("follow");
1911        assert!(follow.poll().expect("initial poll").events.is_empty());
1912
1913        logger
1914            .emit(log_event_with_request(
1915                service_name(),
1916                "before-truncate",
1917                20,
1918            ))
1919            .expect("emit before truncate");
1920        assert_eq!(
1921            request_ids(&follow.poll().expect("poll before truncate")),
1922            ["before-truncate"]
1923        );
1924
1925        let active_path = default_log_path(&root, &service_name());
1926        fs::File::create(&active_path).expect("truncate active log");
1927
1928        logger
1929            .emit(log_event_with_request(service_name(), "after-truncate", 20))
1930            .expect("emit after truncate");
1931        // Windows can replay previously read records once a truncate resets the file position,
1932        // while Unix platforms often yield only the new post-truncate record.
1933        let after_truncate = drain_follow_until_request_id(&mut follow, "after-truncate");
1934        assert!(
1935            after_truncate == vec!["after-truncate"]
1936                || after_truncate == vec!["backlog", "before-truncate", "after-truncate"]
1937        );
1938        let truncate_health = follow.health();
1939        assert_eq!(truncate_health.state, QueryHealthState::Degraded);
1940        assert!(
1941            truncate_health
1942                .last_error
1943                .expect("truncate health summary")
1944                .message
1945                .contains("truncation")
1946        );
1947
1948        recreate_with_distinct_unix_identity(&active_path);
1949        logger
1950            .emit(log_event_with_request(service_name(), "after-recreate", 20))
1951            .expect("emit after recreate");
1952        let after_recreate = drain_follow_until_request_id(&mut follow, "after-recreate");
1953        assert_eq!(after_recreate, vec!["after-recreate"]);
1954        let recreate_health = follow.health();
1955        assert_eq!(recreate_health.state, QueryHealthState::Degraded);
1956        assert!(
1957            recreate_health
1958                .last_error
1959                .expect("recreate health summary")
1960                .message
1961                .contains("identity changed")
1962        );
1963    }
1964}