Skip to main content

sc_observe/
lib.rs

1//! Typed observation routing layered on top of `sc-observability`.
2//!
3//! This crate owns construction-time subscriber/projector registration,
4//! per-type routing, and top-level observability health aggregation while
5//! remaining independent of OTLP transport details.
6#![expect(
7    clippy::missing_errors_doc,
8    reason = "public routing-facade error behavior is documented centrally in workspace docs, and repeating boilerplate on every wrapper method adds low signal"
9)]
10#![expect(
11    clippy::must_use_candidate,
12    reason = "builder and accessor methods intentionally avoid pervasive must_use boilerplate across the facade"
13)]
14#![expect(
15    clippy::return_self_not_must_use,
16    reason = "builder-style chaining is explicit from the signatures and intentionally lightweight"
17)]
18#![expect(
19    clippy::struct_field_names,
20    reason = "the health-provider field uses the full domain term for clarity across builder/runtime structs"
21)]
22#![expect(
23    clippy::needless_pass_by_value,
24    reason = "Observation is the producer-facing owned emission contract, so emit intentionally takes ownership"
25)]
26
27pub mod constants;
28pub mod error_codes;
29
30use std::any::{Any, TypeId};
31use std::path::PathBuf;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::{Arc, Mutex};
34
35use sc_observability::{LogError, Logger, LoggerConfig, RetainedLogPolicy, Running, Stopped};
36use sc_observability_types::{
37    DiagnosticInfo, DiagnosticSummary, EnvPrefix, ErrorContext, FlushError, InitError,
38    ObservabilityHealthProvider, Observable, Observation, ProjectionRegistration, Remediation,
39    ServiceName, ShutdownError, SubscriberError, SubscriberRegistration, TelemetryHealthState,
40    ToolName,
41};
42#[doc(inline)]
43pub use sc_observability_types::{
44    ObservabilityHealthReport, ObservationError, ObservationHealthState,
45};
46
47/// Top-level configuration for the observation routing runtime.
48///
49/// Routing owns tool identity, log-root selection, env-prefix derivation, and
50/// queue capacity. Logging-specific level, retention, and redaction behavior
51/// stay owned by `LoggerConfig` in `sc-observability` and are intentionally not
52/// overridable at the `ObservabilityConfig` layer.
53#[derive(Debug, Clone)]
54pub struct ObservabilityConfig {
55    /// Stable tool name used to derive service and log layout defaults.
56    pub tool_name: ToolName,
57    /// Root directory that owns the routing runtime log tree.
58    pub log_root: PathBuf,
59    /// Environment-variable prefix used by the owning application.
60    pub env_prefix: EnvPrefix,
61    /// Reserved for future async/backpressure implementation. Phase 1 execution is synchronous; this value is stored but not yet applied.
62    pub queue_capacity: usize,
63    /// Retained-log policy forwarded to the built-in logging layer.
64    pub retained_log_policy: RetainedLogPolicy,
65}
66
67impl ObservabilityConfig {
68    /// Builds the documented v1 defaults from a tool name and log root.
69    ///
70    /// # Examples
71    ///
72    /// ```
73    /// use std::path::PathBuf;
74    /// use sc_observability_types::ToolName;
75    /// use sc_observe::ObservabilityConfig;
76    ///
77    /// let config = ObservabilityConfig::default_for(
78    ///     ToolName::new("demo-tool").expect("valid tool"),
79    ///     PathBuf::from("logs"),
80    /// )
81    /// .expect("valid config");
82    ///
83    /// assert_eq!(config.tool_name.as_str(), "demo-tool");
84    /// ```
85    pub fn default_for(tool_name: ToolName, log_root: PathBuf) -> Result<Self, InitError> {
86        let env_prefix = EnvPrefix::new(
87            tool_name
88                .as_str()
89                .replace(['-', '.'], "_")
90                .to_ascii_uppercase(),
91        )
92        .map_err(|err| {
93            InitError(Box::new(
94                ErrorContext::new(
95                    error_codes::OBSERVABILITY_INIT_FAILED,
96                    "failed to derive env prefix",
97                    Remediation::not_recoverable("use an explicit valid env prefix"),
98                )
99                .cause(err.to_string())
100                .source(Box::new(err)),
101            ))
102        })?;
103        Ok(Self {
104            tool_name,
105            log_root,
106            env_prefix,
107            queue_capacity: constants::DEFAULT_OBSERVATION_QUEUE_CAPACITY,
108            retained_log_policy: RetainedLogPolicy::default(),
109        })
110    }
111
112    /// Derives the logging/telemetry service name from the configured tool.
113    pub fn service_name(&self) -> Result<ServiceName, InitError> {
114        ServiceName::new(self.tool_name.as_str()).map_err(|err| {
115            InitError(Box::new(
116                ErrorContext::new(
117                    error_codes::OBSERVABILITY_INIT_FAILED,
118                    "failed to derive service name",
119                    Remediation::not_recoverable("use a valid tool name"),
120                )
121                .cause(err.to_string())
122                .source(Box::new(err)),
123            ))
124        })
125    }
126
127    fn logger_config(&self) -> Result<LoggerConfig, InitError> {
128        let mut config = LoggerConfig::default_for(self.service_name()?, self.log_root.clone());
129        config.queue_capacity = self.queue_capacity;
130        config.retained_log_policy = self.retained_log_policy;
131        Ok(config)
132    }
133}
134
135/// Builder for construction-time subscriber and projector registration.
136#[expect(
137    missing_debug_implementations,
138    reason = "the builder stores type-erased routing closures and health providers whose internals are not part of the public debug contract"
139)]
140pub struct ObservabilityBuilder {
141    config: ObservabilityConfig,
142    subscribers: Vec<ErasedSubscriberRegistration>,
143    projections: Vec<ErasedProjectionRegistration>,
144    observability_health_provider: Option<Arc<dyn ObservabilityHealthProvider>>,
145}
146
147/// Producer-facing routing runtime for typed observations.
148#[expect(
149    missing_debug_implementations,
150    reason = "the runtime owns atomic state, mutexes, and type-erased routes that do not have a useful stable Debug representation"
151)]
152pub struct Observability {
153    logger: Mutex<Option<LoggerHandle>>,
154    shutdown: AtomicBool,
155    subscriber_registrations: Vec<ErasedSubscriberRegistration>,
156    projection_registrations: Vec<ErasedProjectionRegistration>,
157    observability_health_provider: Option<Arc<dyn ObservabilityHealthProvider>>,
158    runtime: RuntimeState,
159}
160
161#[derive(Default)]
162struct RuntimeState {
163    dropped_observations_total: AtomicU64,
164    subscriber_failures_total: AtomicU64,
165    projection_failures_total: AtomicU64,
166    // MUTEX: routing failures update the shared last_error summary from multiple subscriber and
167    // projector call paths; Mutex keeps the optional summary coherent as one unit, and RwLock
168    // adds no value because writes dominate error reporting.
169    last_error: Mutex<Option<DiagnosticSummary>>,
170}
171
172struct ErasedSubscriberRegistration {
173    type_id: TypeId,
174    dispatch: Arc<SubscriberDispatchFn>,
175}
176
177struct ErasedProjectionRegistration {
178    type_id: TypeId,
179    dispatch: Arc<ProjectionDispatchFn>,
180}
181
182enum LoggerHandle {
183    Running(Logger<Running>),
184    Stopped(Logger<Stopped>),
185}
186
187type SubscriberDispatchFn =
188    dyn Fn(&dyn Any) -> Result<DispatchMatch, SubscriberError> + Send + Sync + 'static;
189type ProjectionDispatchFn =
190    dyn Fn(&dyn Any, &Logger<Running>) -> ProjectionDispatchResult + Send + Sync + 'static;
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193enum DispatchMatch {
194    Skipped,
195    Delivered,
196}
197
198#[derive(Debug, Default, Clone, PartialEq)]
199struct ProjectionDispatchResult {
200    matched: bool,
201    failure_count: u64,
202    last_error: Option<DiagnosticSummary>,
203}
204
205fn log_error_summary(error: &LogError) -> DiagnosticSummary {
206    match error {
207        LogError::InvalidEvent(error) => DiagnosticSummary::from(error.diagnostic()),
208        LogError::WriterDegraded(error) | LogError::ShutdownTimedOut(error) => {
209            DiagnosticSummary::from(error.diagnostic())
210        }
211    }
212}
213
214impl Observability {
215    /// Builds a runtime using the documented default logger integration.
216    pub fn new(config: ObservabilityConfig) -> Result<Self, InitError> {
217        Self::builder(config).build()
218    }
219
220    /// Starts a construction-time builder for subscribers and projections.
221    ///
222    /// # Examples
223    ///
224    /// ```
225    /// use std::path::PathBuf;
226    /// use sc_observability_types::ToolName;
227    /// use sc_observe::{Observability, ObservabilityConfig};
228    ///
229    /// let config = ObservabilityConfig::default_for(
230    ///     ToolName::new("demo-tool").expect("valid tool"),
231    ///     PathBuf::from("logs"),
232    /// )
233    /// .expect("valid config");
234    ///
235    /// let _builder = Observability::builder(config);
236    /// ```
237    pub fn builder(config: ObservabilityConfig) -> ObservabilityBuilder {
238        ObservabilityBuilder {
239            config,
240            subscribers: Vec::new(),
241            projections: Vec::new(),
242            observability_health_provider: None,
243        }
244    }
245
246    /// Routes one typed observation through the registered subscribers and projections.
247    ///
248    /// # Panics
249    ///
250    /// Panics if the internal last-error mutex has been poisoned while the
251    /// runtime records a routing, subscriber, or projection failure summary.
252    pub fn emit<T>(&self, observation: Observation<T>) -> Result<(), ObservationError>
253    where
254        T: Observable,
255    {
256        if self.shutdown.load(Ordering::SeqCst) {
257            return Err(ObservationError::Shutdown);
258        }
259
260        let observation_any = &observation as &dyn Any;
261        let type_id = TypeId::of::<T>();
262        let mut matched = false;
263
264        for registration in self
265            .subscriber_registrations
266            .iter()
267            .filter(|entry| entry.type_id == type_id)
268        {
269            match (registration.dispatch)(observation_any) {
270                Ok(DispatchMatch::Delivered) => matched = true,
271                Ok(DispatchMatch::Skipped) => {}
272                Err(err) => {
273                    self.runtime
274                        .subscriber_failures_total
275                        .fetch_add(1, Ordering::SeqCst);
276                    self.record_last_error(DiagnosticSummary::from(err.diagnostic()));
277                }
278            }
279        }
280
281        for registration in self
282            .projection_registrations
283            .iter()
284            .filter(|entry| entry.type_id == type_id)
285        {
286            let logger = self.logger.lock().expect("observability logger poisoned");
287            let LoggerHandle::Running(logger) = logger
288                .as_ref()
289                .expect("observability logger should exist while runtime is alive")
290            else {
291                return Err(ObservationError::Shutdown);
292            };
293            let result = (registration.dispatch)(observation_any, logger);
294            matched |= result.matched;
295            if result.failure_count > 0 {
296                self.runtime
297                    .projection_failures_total
298                    .fetch_add(result.failure_count, Ordering::SeqCst);
299                if let Some(summary) = result.last_error {
300                    self.record_last_error(summary);
301                }
302            }
303        }
304
305        if !matched {
306            self.runtime
307                .dropped_observations_total
308                .fetch_add(1, Ordering::SeqCst);
309            // Failing subscribers do not count as active paths; RoutingFailure
310            // is correct per OBS-009/OBS-010.
311            let context = ErrorContext::new(
312                error_codes::OBSERVATION_ROUTING_FAILURE,
313                "no eligible subscriber or projector path matched the observation",
314                Remediation::recoverable(
315                    "register at least one matching subscriber or projector",
316                    ["ensure filters allow the emitted observation type"],
317                ),
318            );
319            self.record_last_error(DiagnosticSummary::from(context.diagnostic()));
320            return Err(ObservationError::RoutingFailure(Box::new(context)));
321        }
322
323        Ok(())
324    }
325
326    /// Flushes the attached logger. Routing itself does not keep an async queue in v1.
327    ///
328    /// # Panics
329    ///
330    /// Panics if the attached logger encounters a poisoned internal mutex while
331    /// flushing its registered sinks.
332    pub fn flush(&self) -> Result<(), FlushError> {
333        let logger = self.logger.lock().expect("observability logger poisoned");
334        match logger
335            .as_ref()
336            .expect("observability logger should exist while runtime is alive")
337        {
338            LoggerHandle::Running(logger) => logger.flush(),
339            LoggerHandle::Stopped(_) => Ok(()),
340        }
341    }
342
343    /// Shuts down the routing runtime. Repeated calls are idempotent.
344    ///
345    /// # Panics
346    ///
347    /// Panics if the attached logger encounters a poisoned internal mutex while
348    /// flushing sinks or updating query/follow health during shutdown.
349    pub fn shutdown(&self) -> Result<(), ShutdownError> {
350        if self.shutdown.swap(true, Ordering::SeqCst) {
351            return Ok(());
352        }
353        let mut logger = self.logger.lock().expect("observability logger poisoned");
354        let handle = logger
355            .take()
356            .expect("observability logger should exist while runtime is alive");
357        *logger = Some(match handle {
358            LoggerHandle::Running(logger) => LoggerHandle::Stopped(logger.shutdown()),
359            LoggerHandle::Stopped(logger) => LoggerHandle::Stopped(logger),
360        });
361        Ok(())
362    }
363
364    /// Returns the aggregate runtime health view.
365    ///
366    /// # Panics
367    ///
368    /// Panics if the internal last-error mutex has been poisoned.
369    pub fn health(&self) -> ObservabilityHealthReport {
370        let logging = {
371            let logger = self.logger.lock().expect("observability logger poisoned");
372            match logger
373                .as_ref()
374                .expect("observability logger should exist while runtime is alive")
375            {
376                LoggerHandle::Running(logger) => logger.health(),
377                LoggerHandle::Stopped(logger) => logger.health(),
378            }
379        };
380        let telemetry = self
381            .observability_health_provider
382            .as_ref()
383            .map(sc_observability_types::ObservabilityHealthProvider::telemetry_health);
384        let subscriber_failures = self
385            .runtime
386            .subscriber_failures_total
387            .load(Ordering::SeqCst);
388        let projection_failures = self
389            .runtime
390            .projection_failures_total
391            .load(Ordering::SeqCst);
392        let dropped = self
393            .runtime
394            .dropped_observations_total
395            .load(Ordering::SeqCst);
396
397        let state = if self.shutdown.load(Ordering::SeqCst) {
398            ObservationHealthState::Unavailable
399        } else if dropped > 0
400            || subscriber_failures > 0
401            || projection_failures > 0
402            || logging.state != sc_observability_types::LoggingHealthState::Healthy
403            || telemetry.as_ref().is_some_and(|health| {
404                matches!(
405                    health.state,
406                    TelemetryHealthState::Degraded | TelemetryHealthState::Unavailable
407                )
408            })
409        {
410            ObservationHealthState::Degraded
411        } else {
412            ObservationHealthState::Healthy
413        };
414
415        ObservabilityHealthReport {
416            state,
417            dropped_observations_total: dropped,
418            subscriber_failures_total: subscriber_failures,
419            projection_failures_total: projection_failures,
420            logging: Some(logging),
421            telemetry,
422            last_error: self
423                .runtime
424                .last_error
425                .lock()
426                .expect("observability last_error poisoned")
427                .clone(),
428        }
429    }
430
431    fn record_last_error(&self, summary: DiagnosticSummary) {
432        *self
433            .runtime
434            .last_error
435            .lock()
436            .expect("observability last_error poisoned") = Some(summary);
437    }
438}
439
440impl ObservabilityBuilder {
441    /// Attaches a generic telemetry health provider without introducing an
442    /// OTLP crate dependency.
443    #[expect(
444        clippy::implied_bounds_in_impls,
445        reason = "the public API intentionally spells out Send + Sync per QA-BP-IMC-007"
446    )]
447    pub fn with_observability_health_provider(
448        mut self,
449        provider: impl ObservabilityHealthProvider + Send + Sync + 'static,
450    ) -> Self {
451        self.observability_health_provider = Some(Arc::new(provider));
452        self
453    }
454
455    /// Registers one typed observation subscriber at construction time.
456    ///
457    /// # Panics
458    ///
459    /// Panics if internal type-erased routing calls this registration with the
460    /// wrong observation payload type.
461    pub fn register_subscriber<T>(mut self, registration: SubscriberRegistration<T>) -> Self
462    where
463        T: Observable,
464    {
465        let (subscriber, filter) = registration.into_parts();
466        self.subscribers.push(ErasedSubscriberRegistration {
467            type_id: TypeId::of::<T>(),
468            dispatch: Arc::new(move |observation_any| {
469                let observation = observation_any
470                    .downcast_ref::<Observation<T>>()
471                    .expect("type-erased routing matched wrong observation type");
472
473                if filter
474                    .as_ref()
475                    .is_some_and(|filter| !filter.accepts(observation))
476                {
477                    return Ok(DispatchMatch::Skipped);
478                }
479
480                subscriber.observe(observation)?;
481                Ok(DispatchMatch::Delivered)
482            }),
483        });
484        self
485    }
486
487    /// Registers one typed observation projection set at construction time.
488    ///
489    /// # Panics
490    ///
491    /// Panics if internal type-erased routing calls this registration with the
492    /// wrong observation payload type.
493    pub fn register_projection<T>(mut self, registration: ProjectionRegistration<T>) -> Self
494    where
495        T: Observable,
496    {
497        let (log_projector, span_projector, metric_projector, filter) = registration.into_parts();
498
499        self.projections.push(ErasedProjectionRegistration {
500            type_id: TypeId::of::<T>(),
501            dispatch: Arc::new(move |observation_any, logger| {
502                let observation = observation_any
503                    .downcast_ref::<Observation<T>>()
504                    .expect("type-erased routing matched wrong observation type");
505
506                if filter
507                    .as_ref()
508                    .is_some_and(|filter| !filter.accepts(observation))
509                {
510                    return ProjectionDispatchResult::default();
511                }
512
513                let mut result = ProjectionDispatchResult::default();
514                let mut record_failure = |summary: DiagnosticSummary| {
515                    result.failure_count += 1;
516                    result.last_error = Some(summary);
517                };
518
519                if let Some(projector) = &log_projector {
520                    match projector.project_logs(observation) {
521                        Ok(events) => {
522                            result.matched = true;
523                            for event in events {
524                                if let Err(err) = logger.log(event) {
525                                    record_failure(log_error_summary(&err));
526                                }
527                            }
528                            if let Err(err) = logger.flush() {
529                                record_failure(DiagnosticSummary::from(err.diagnostic()));
530                            }
531                        }
532                        Err(err) => record_failure(DiagnosticSummary::from(err.diagnostic())),
533                    }
534                }
535
536                if let Some(projector) = &span_projector {
537                    match projector.project_spans(observation) {
538                        Ok(_) => result.matched = true,
539                        Err(err) => record_failure(DiagnosticSummary::from(err.diagnostic())),
540                    }
541                }
542
543                if let Some(projector) = &metric_projector {
544                    match projector.project_metrics(observation) {
545                        Ok(_) => result.matched = true,
546                        Err(err) => record_failure(DiagnosticSummary::from(err.diagnostic())),
547                    }
548                }
549
550                result
551            }),
552        });
553        self
554    }
555
556    /// Finalizes registration and constructs the routing runtime.
557    pub fn build(self) -> Result<Observability, InitError> {
558        if self.subscribers.is_empty() && self.projections.is_empty() {
559            return Err(InitError(Box::new(ErrorContext::new(
560                error_codes::OBSERVABILITY_INIT_FAILED,
561                "at least one subscriber or projector route must be registered",
562                Remediation::recoverable(
563                    "register a subscriber or projector before building observability",
564                    ["add at least one route for the observation types you emit"],
565                ),
566            ))));
567        }
568        let logger = Logger::new(self.config.logger_config()?)?;
569        Ok(Observability {
570            logger: Mutex::new(Some(LoggerHandle::Running(logger))),
571            shutdown: AtomicBool::new(false),
572            subscriber_registrations: self.subscribers,
573            projection_registrations: self.projections,
574            observability_health_provider: self.observability_health_provider,
575            runtime: RuntimeState::default(),
576        })
577    }
578}
579
580mod sealed_emitters {
581    pub trait Sealed {}
582}
583
584/// `ObservationEmitter<T>` is intentionally per-type -- callers hold one handle
585/// per observation type. A single type-erased emitter for heterogeneous events
586/// is not supported by design.
587#[expect(
588    dead_code,
589    reason = "crate-local observation emitter trait is intentionally retained for injection"
590)]
591pub(crate) trait ObservationEmitter<T>: sealed_emitters::Sealed + Send + Sync
592where
593    T: Observable,
594{
595    fn emit(&self, observation: Observation<T>) -> Result<(), ObservationError>;
596}
597
598impl sealed_emitters::Sealed for Observability {}
599
600impl<T> ObservationEmitter<T> for Observability
601where
602    T: Observable,
603{
604    fn emit(&self, observation: Observation<T>) -> Result<(), ObservationError> {
605        Observability::emit(self, observation)
606    }
607}
608
609#[cfg(test)]
610mod tests {
611    use super::*;
612    use sc_observability::{
613        LogFilter, LogSink, LoggerConfig, SinkHealth, SinkHealthState, SinkRegistration,
614    };
615    use sc_observability_types::{
616        ActionName, Diagnostic, ErrorCode, Level, LogEvent, LogSinkError, MetricKind, MetricName,
617        MetricRecord, MetricUnit, ObservationFilter, ObservationSubscriber, ProcessIdentity,
618        ProjectionError, SpanId, SpanProjector, SpanRecord, SpanSignal, SpanStarted,
619        SubscriberError, TargetCategory, TelemetryHealthReport, TelemetryHealthState, Timestamp,
620        TraceContext, TraceId,
621    };
622    use serde_json::Map;
623
624    #[derive(Debug, Clone)]
625    struct AgentEvent {
626        kind: &'static str,
627        allow: bool,
628    }
629
630    struct RecordingSubscriber {
631        id: &'static str,
632        calls: Arc<Mutex<Vec<&'static str>>>,
633    }
634
635    impl ObservationSubscriber<AgentEvent> for RecordingSubscriber {
636        fn observe(&self, _observation: &Observation<AgentEvent>) -> Result<(), SubscriberError> {
637            self.calls.lock().expect("calls poisoned").push(self.id);
638            Ok(())
639        }
640    }
641
642    struct AllowFlagFilter;
643
644    impl ObservationFilter<AgentEvent> for AllowFlagFilter {
645        fn accepts(&self, observation: &Observation<AgentEvent>) -> bool {
646            observation.payload.allow
647        }
648    }
649
650    struct FailingSubscriber;
651
652    impl ObservationSubscriber<AgentEvent> for FailingSubscriber {
653        fn observe(&self, _observation: &Observation<AgentEvent>) -> Result<(), SubscriberError> {
654            Err(SubscriberError(Box::new(ErrorContext::new(
655                error_codes::OBSERVATION_ROUTING_FAILURE,
656                "subscriber failed",
657                Remediation::not_recoverable("test subscriber intentionally fails"),
658            ))))
659        }
660    }
661
662    struct RecordingLogProjector {
663        calls: Arc<Mutex<Vec<&'static str>>>,
664        id: &'static str,
665    }
666
667    impl sc_observability_types::LogProjector<AgentEvent> for RecordingLogProjector {
668        fn project_logs(
669            &self,
670            observation: &Observation<AgentEvent>,
671        ) -> Result<Vec<LogEvent>, ProjectionError> {
672            self.calls.lock().expect("calls poisoned").push(self.id);
673            Ok(vec![log_event(
674                observation.service.clone(),
675                observation.payload.kind,
676            )])
677        }
678    }
679
680    struct RecordingSpanProjector {
681        count: Arc<AtomicU64>,
682    }
683
684    impl SpanProjector<AgentEvent> for RecordingSpanProjector {
685        fn project_spans(
686            &self,
687            observation: &Observation<AgentEvent>,
688        ) -> Result<Vec<SpanSignal>, ProjectionError> {
689            self.count.fetch_add(1, Ordering::SeqCst);
690            Ok(vec![SpanSignal::Started(SpanRecord::<SpanStarted>::new(
691                Timestamp::UNIX_EPOCH,
692                observation.service.clone(),
693                ActionName::new("span.started").expect("valid action"),
694                trace_context(),
695                Map::default(),
696            ))])
697        }
698    }
699
700    struct RecordingMetricProjector {
701        count: Arc<AtomicU64>,
702    }
703
704    impl sc_observability_types::MetricProjector<AgentEvent> for RecordingMetricProjector {
705        fn project_metrics(
706            &self,
707            observation: &Observation<AgentEvent>,
708        ) -> Result<Vec<MetricRecord>, ProjectionError> {
709            self.count.fetch_add(1, Ordering::SeqCst);
710            Ok(vec![MetricRecord {
711                timestamp: Timestamp::UNIX_EPOCH,
712                service: observation.service.clone(),
713                name: MetricName::new("obs.events_total").expect("valid metric"),
714                kind: MetricKind::Counter,
715                value: 1.0,
716                unit: Some(MetricUnit::new("1").expect("valid metric unit")),
717                attributes: Map::default(),
718            }])
719        }
720    }
721
722    struct FailingProjector;
723
724    impl sc_observability_types::LogProjector<AgentEvent> for FailingProjector {
725        fn project_logs(
726            &self,
727            _observation: &Observation<AgentEvent>,
728        ) -> Result<Vec<LogEvent>, ProjectionError> {
729            Err(ProjectionError(Box::new(ErrorContext::new(
730                error_codes::OBSERVATION_ROUTING_FAILURE,
731                "projector failed",
732                Remediation::not_recoverable("test projector intentionally fails"),
733            ))))
734        }
735    }
736
737    struct FakeTelemetryProvider {
738        state: TelemetryHealthState,
739    }
740
741    impl sc_observability_types::telemetry_health_provider_sealed::Sealed for FakeTelemetryProvider {
742        fn token(&self) -> sc_observability_types::telemetry_health_provider_sealed::Token {
743            sc_observability_types::telemetry_health_provider_sealed::workspace_token()
744        }
745    }
746
747    impl ObservabilityHealthProvider for FakeTelemetryProvider {
748        fn telemetry_health(&self) -> TelemetryHealthReport {
749            TelemetryHealthReport {
750                state: self.state,
751                dropped_exports_total: 0,
752                malformed_spans_total: 0,
753                exporter_statuses: Vec::new(),
754                last_error: None,
755            }
756        }
757    }
758
759    fn tool_name() -> ToolName {
760        ToolName::new("obs-app").expect("valid tool name")
761    }
762
763    fn temp_path(name: &str) -> PathBuf {
764        std::env::temp_dir().join(format!(
765            "sc-observe-{name}-{}-{}",
766            std::process::id(),
767            std::time::SystemTime::now()
768                .duration_since(std::time::SystemTime::UNIX_EPOCH)
769                .expect("system time before unix epoch")
770                .as_nanos()
771        ))
772    }
773
774    fn trace_context() -> TraceContext {
775        TraceContext {
776            trace_id: TraceId::new("0123456789abcdef0123456789abcdef").expect("valid trace id"),
777            span_id: SpanId::new("0123456789abcdef").expect("valid span id"),
778            parent_span_id: None,
779        }
780    }
781
782    fn schema_version() -> sc_observability_types::SchemaVersion {
783        sc_observability_types::SchemaVersion::new(
784            sc_observability_types::constants::OBSERVATION_ENVELOPE_VERSION,
785        )
786        .expect("valid schema version")
787    }
788
789    fn outcome_label(value: &str) -> sc_observability_types::OutcomeLabel {
790        sc_observability_types::OutcomeLabel::new(value).expect("valid outcome label")
791    }
792
793    fn sink_name(value: &str) -> sc_observability_types::SinkName {
794        sc_observability_types::SinkName::new(value).expect("valid sink name")
795    }
796
797    fn observation(allow: bool) -> Observation<AgentEvent> {
798        let mut observation = Observation::new(
799            ServiceName::new("obs-app").expect("valid service"),
800            AgentEvent {
801                kind: "received",
802                allow,
803            },
804        );
805        observation.identity = ProcessIdentity::default();
806        observation
807    }
808
809    fn log_event(service: ServiceName, message: &str) -> LogEvent {
810        LogEvent {
811            version: schema_version(),
812            timestamp: Timestamp::UNIX_EPOCH,
813            level: Level::Info,
814            service,
815            target: TargetCategory::new("observe.routing").expect("valid target"),
816            action: ActionName::new("observation.received").expect("valid action"),
817            message: Some(message.to_string()),
818            identity: ProcessIdentity::default(),
819            trace: Some(trace_context()),
820            request_id: None,
821            correlation_id: None,
822            outcome: Some(outcome_label("ok")),
823            diagnostic: Some(Diagnostic {
824                timestamp: Timestamp::UNIX_EPOCH,
825                code: ErrorCode::new_static("SC_TEST"),
826                message: "projected".to_string(),
827                cause: None,
828                remediation: Remediation::recoverable("retry", ["inspect log output"]),
829                docs: None,
830                details: Map::default(),
831            }),
832            state_transition: None,
833            fields: Map::default(),
834        }
835    }
836
837    #[test]
838    fn registration_order_routing_is_deterministic() {
839        let calls = Arc::new(Mutex::new(Vec::new()));
840        let root = temp_path("order");
841        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
842        let runtime = Observability::builder(config)
843            .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
844                id: "first",
845                calls: calls.clone(),
846            })))
847            .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
848                id: "second",
849                calls: calls.clone(),
850            })))
851            .build()
852            .expect("runtime");
853
854        runtime.emit(observation(true)).expect("emit");
855
856        assert_eq!(
857            *calls.lock().expect("calls poisoned"),
858            vec!["first", "second"]
859        );
860    }
861
862    #[test]
863    fn filter_acceptance_and_rejection_are_respected() {
864        let calls = Arc::new(Mutex::new(Vec::new()));
865        let root = temp_path("filter");
866        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
867        let runtime = Observability::builder(config)
868            .register_subscriber(
869                SubscriberRegistration::new(Arc::new(RecordingSubscriber {
870                    id: "allowed",
871                    calls: calls.clone(),
872                }))
873                .with_filter(Arc::new(AllowFlagFilter)),
874            )
875            .build()
876            .expect("runtime");
877
878        assert!(runtime.emit(observation(false)).is_err());
879        runtime.emit(observation(true)).expect("emit");
880
881        assert_eq!(*calls.lock().expect("calls poisoned"), vec!["allowed"]);
882    }
883
884    #[test]
885    fn subscriber_failures_are_isolated() {
886        let calls = Arc::new(Mutex::new(Vec::new()));
887        let root = temp_path("subscriber-failure");
888        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
889        let runtime = Observability::builder(config)
890            .register_subscriber(SubscriberRegistration::new(Arc::new(FailingSubscriber)))
891            .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
892                id: "still-runs",
893                calls: calls.clone(),
894            })))
895            .build()
896            .expect("runtime");
897
898        runtime.emit(observation(true)).expect("emit");
899
900        let health = runtime.health();
901        assert_eq!(health.subscriber_failures_total, 1);
902        assert_eq!(*calls.lock().expect("calls poisoned"), vec!["still-runs"]);
903        assert_eq!(health.state, ObservationHealthState::Degraded);
904    }
905
906    #[test]
907    fn projector_failures_are_isolated() {
908        let log_calls = Arc::new(Mutex::new(Vec::new()));
909        let span_count = Arc::new(AtomicU64::new(0));
910        let metric_count = Arc::new(AtomicU64::new(0));
911        let root = temp_path("projector-failure");
912        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
913        let runtime = Observability::builder(config)
914            .register_projection(
915                ProjectionRegistration::new()
916                    .with_log_projector(Arc::new(FailingProjector))
917                    .with_span_projector(Arc::new(RecordingSpanProjector {
918                        count: span_count.clone(),
919                    }))
920                    .with_metric_projector(Arc::new(RecordingMetricProjector {
921                        count: metric_count.clone(),
922                    })),
923            )
924            .register_projection(ProjectionRegistration::new().with_log_projector(Arc::new(
925                RecordingLogProjector {
926                    calls: log_calls.clone(),
927                    id: "log",
928                },
929            )))
930            .build()
931            .expect("runtime");
932
933        runtime.emit(observation(true)).expect("emit");
934
935        let health = runtime.health();
936        assert_eq!(health.projection_failures_total, 1);
937        assert_eq!(span_count.load(Ordering::SeqCst), 1);
938        assert_eq!(metric_count.load(Ordering::SeqCst), 1);
939        assert_eq!(*log_calls.lock().expect("calls poisoned"), vec!["log"]);
940    }
941
942    #[test]
943    fn routing_failure_occurs_when_no_eligible_path_remains() {
944        let root = temp_path("routing-failure");
945        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
946        let runtime = Observability::builder(config)
947            .register_subscriber(
948                SubscriberRegistration::new(Arc::new(RecordingSubscriber {
949                    id: "filtered",
950                    calls: Arc::new(Mutex::new(Vec::new())),
951                }))
952                .with_filter(Arc::new(AllowFlagFilter)),
953            )
954            .build()
955            .expect("runtime");
956
957        let result = runtime.emit(observation(false));
958
959        assert!(matches!(result, Err(ObservationError::RoutingFailure(_))));
960        assert_eq!(runtime.health().dropped_observations_total, 1);
961    }
962
963    #[test]
964    fn routing_failure_occurs_when_all_projectors_fail() {
965        let root = temp_path("projector-routing-failure");
966        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
967        let runtime = Observability::builder(config)
968            .register_projection(
969                ProjectionRegistration::new().with_log_projector(Arc::new(FailingProjector)),
970            )
971            .build()
972            .expect("runtime");
973
974        let result = runtime.emit(observation(true));
975
976        assert!(matches!(result, Err(ObservationError::RoutingFailure(_))));
977        let health = runtime.health();
978        assert_eq!(health.dropped_observations_total, 1);
979        assert_eq!(health.projection_failures_total, 1);
980    }
981
982    #[test]
983    fn post_shutdown_emission_returns_shutdown_error() {
984        let root = temp_path("shutdown");
985        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
986        let runtime = Observability::builder(config)
987            .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
988                id: "shutdown",
989                calls: Arc::new(Mutex::new(Vec::new())),
990            })))
991            .build()
992            .expect("runtime");
993
994        runtime.shutdown().expect("shutdown");
995
996        assert!(matches!(
997            runtime.emit(observation(true)),
998            Err(ObservationError::Shutdown)
999        ));
1000    }
1001
1002    #[test]
1003    fn top_level_health_aggregates_logging_and_routing_state() {
1004        let root = temp_path("health");
1005        let config = ObservabilityConfig::default_for(tool_name(), root.clone()).expect("config");
1006        let runtime = Observability::builder(config)
1007            .register_projection(
1008                ProjectionRegistration::new().with_log_projector(Arc::new(FailingProjector)),
1009            )
1010            .build()
1011            .expect("runtime");
1012
1013        let _ = runtime.emit(observation(true));
1014        let health = runtime.health();
1015
1016        assert_eq!(health.state, ObservationHealthState::Degraded);
1017        assert_eq!(health.projection_failures_total, 1);
1018        assert!(health.logging.is_some());
1019        assert!(health.last_error.is_some());
1020        assert!(health.telemetry.is_none());
1021    }
1022
1023    #[test]
1024    fn top_level_health_exposes_attached_telemetry_provider() {
1025        let root = temp_path("telemetry-health");
1026        let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
1027        let runtime = Observability::builder(config)
1028            .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
1029                id: "telemetry-health",
1030                calls: Arc::new(Mutex::new(Vec::new())),
1031            })))
1032            .with_observability_health_provider(Arc::new(FakeTelemetryProvider {
1033                state: TelemetryHealthState::Degraded,
1034            }))
1035            .build()
1036            .expect("runtime");
1037
1038        let health = runtime.health();
1039
1040        assert_eq!(health.state, ObservationHealthState::Degraded);
1041        assert_eq!(
1042            health.telemetry.expect("telemetry health").state,
1043            TelemetryHealthState::Degraded
1044        );
1045    }
1046
1047    #[test]
1048    fn queue_capacity_override_propagates_to_logger_config() {
1049        let root = temp_path("queue-capacity");
1050        let mut config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
1051        config.queue_capacity = 2048;
1052
1053        let logger_config = config.logger_config().expect("logger config");
1054
1055        assert_eq!(logger_config.queue_capacity, 2048);
1056    }
1057
1058    #[test]
1059    fn flush_forwards_logger_flush_behavior_directly() {
1060        struct PassthroughFilter;
1061
1062        impl LogFilter for PassthroughFilter {
1063            fn accepts(&self, _event: &LogEvent) -> bool {
1064                true
1065            }
1066        }
1067
1068        struct FlushFailSink;
1069
1070        impl LogSink for FlushFailSink {
1071            fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1072                Ok(())
1073            }
1074
1075            fn flush(&self) -> Result<(), LogSinkError> {
1076                Err(LogSinkError(Box::new(ErrorContext::new(
1077                    sc_observability::error_codes::LOGGER_FLUSH_FAILED,
1078                    "flush failed",
1079                    Remediation::not_recoverable("test sink intentionally fails flush"),
1080                ))))
1081            }
1082
1083            fn health(&self) -> SinkHealth {
1084                SinkHealth {
1085                    name: sink_name("flush-fail"),
1086                    state: SinkHealthState::DegradedDropping,
1087                    last_error: None,
1088                }
1089            }
1090        }
1091
1092        let ok_root = temp_path("flush-ok");
1093        let ok_config =
1094            ObservabilityConfig::default_for(tool_name(), ok_root.clone()).expect("config");
1095        let ok_runtime = Observability::builder(ok_config)
1096            .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
1097                id: "flush-ok",
1098                calls: Arc::new(Mutex::new(Vec::new())),
1099            })))
1100            .build()
1101            .expect("runtime");
1102        assert!(ok_runtime.flush().is_ok());
1103
1104        let fail_root = temp_path("flush-fail");
1105        let mut logger_config =
1106            LoggerConfig::default_for(ServiceName::new("obs-app").expect("service"), fail_root);
1107        logger_config.enable_file_sink = false;
1108        logger_config.enable_console_sink = false;
1109        let mut builder = sc_observability::Logger::builder(logger_config).expect("logger builder");
1110        builder.register_sink(
1111            SinkRegistration::new(Arc::new(FlushFailSink)).with_filter(Arc::new(PassthroughFilter)),
1112        );
1113        let logger = builder.build();
1114
1115        let runtime = Observability {
1116            logger: Mutex::new(Some(LoggerHandle::Running(logger))),
1117            shutdown: AtomicBool::new(false),
1118            subscriber_registrations: Vec::new(),
1119            projection_registrations: Vec::new(),
1120            observability_health_provider: None,
1121            runtime: RuntimeState::default(),
1122        };
1123
1124        assert!(runtime.flush().is_err());
1125        let logging = runtime.health().logging.expect("logging health");
1126        assert_eq!(logging.flush_errors_total, 1);
1127        assert!(logging.last_error.is_some());
1128    }
1129}