1#![expect(
7 clippy::missing_errors_doc,
8 reason = "public routing-facade error behavior is documented centrally in workspace docs, and repeating boilerplate on every wrapper method adds low signal"
9)]
10#![expect(
11 clippy::must_use_candidate,
12 reason = "builder and accessor methods intentionally avoid pervasive must_use boilerplate across the facade"
13)]
14#![expect(
15 clippy::return_self_not_must_use,
16 reason = "builder-style chaining is explicit from the signatures and intentionally lightweight"
17)]
18#![expect(
19 clippy::struct_field_names,
20 reason = "the health-provider field uses the full domain term for clarity across builder/runtime structs"
21)]
22#![expect(
23 clippy::needless_pass_by_value,
24 reason = "Observation is the producer-facing owned emission contract, so emit intentionally takes ownership"
25)]
26
27pub mod constants;
28pub mod error_codes;
29
30use std::any::{Any, TypeId};
31use std::path::PathBuf;
32use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
33use std::sync::{Arc, Mutex};
34
35use sc_observability::{LogError, Logger, LoggerConfig, RetainedLogPolicy, Running, Stopped};
36use sc_observability_types::{
37 DiagnosticInfo, DiagnosticSummary, EnvPrefix, ErrorContext, FlushError, InitError,
38 ObservabilityHealthProvider, Observable, Observation, ProjectionRegistration, Remediation,
39 ServiceName, ShutdownError, SubscriberError, SubscriberRegistration, TelemetryHealthState,
40 ToolName,
41};
42#[doc(inline)]
43pub use sc_observability_types::{
44 ObservabilityHealthReport, ObservationError, ObservationHealthState,
45};
46
47#[derive(Debug, Clone)]
54pub struct ObservabilityConfig {
55 pub tool_name: ToolName,
57 pub log_root: PathBuf,
59 pub env_prefix: EnvPrefix,
61 pub queue_capacity: usize,
63 pub retained_log_policy: RetainedLogPolicy,
65}
66
67impl ObservabilityConfig {
68 pub fn default_for(tool_name: ToolName, log_root: PathBuf) -> Result<Self, InitError> {
86 let env_prefix = EnvPrefix::new(
87 tool_name
88 .as_str()
89 .replace(['-', '.'], "_")
90 .to_ascii_uppercase(),
91 )
92 .map_err(|err| {
93 InitError(Box::new(
94 ErrorContext::new(
95 error_codes::OBSERVABILITY_INIT_FAILED,
96 "failed to derive env prefix",
97 Remediation::not_recoverable("use an explicit valid env prefix"),
98 )
99 .cause(err.to_string())
100 .source(Box::new(err)),
101 ))
102 })?;
103 Ok(Self {
104 tool_name,
105 log_root,
106 env_prefix,
107 queue_capacity: constants::DEFAULT_OBSERVATION_QUEUE_CAPACITY,
108 retained_log_policy: RetainedLogPolicy::default(),
109 })
110 }
111
112 pub fn service_name(&self) -> Result<ServiceName, InitError> {
114 ServiceName::new(self.tool_name.as_str()).map_err(|err| {
115 InitError(Box::new(
116 ErrorContext::new(
117 error_codes::OBSERVABILITY_INIT_FAILED,
118 "failed to derive service name",
119 Remediation::not_recoverable("use a valid tool name"),
120 )
121 .cause(err.to_string())
122 .source(Box::new(err)),
123 ))
124 })
125 }
126
127 fn logger_config(&self) -> Result<LoggerConfig, InitError> {
128 let mut config = LoggerConfig::default_for(self.service_name()?, self.log_root.clone());
129 config.queue_capacity = self.queue_capacity;
130 config.retained_log_policy = self.retained_log_policy;
131 Ok(config)
132 }
133}
134
135#[expect(
137 missing_debug_implementations,
138 reason = "the builder stores type-erased routing closures and health providers whose internals are not part of the public debug contract"
139)]
140pub struct ObservabilityBuilder {
141 config: ObservabilityConfig,
142 subscribers: Vec<ErasedSubscriberRegistration>,
143 projections: Vec<ErasedProjectionRegistration>,
144 observability_health_provider: Option<Arc<dyn ObservabilityHealthProvider>>,
145}
146
147#[expect(
149 missing_debug_implementations,
150 reason = "the runtime owns atomic state, mutexes, and type-erased routes that do not have a useful stable Debug representation"
151)]
152pub struct Observability {
153 logger: Mutex<Option<LoggerHandle>>,
154 shutdown: AtomicBool,
155 subscriber_registrations: Vec<ErasedSubscriberRegistration>,
156 projection_registrations: Vec<ErasedProjectionRegistration>,
157 observability_health_provider: Option<Arc<dyn ObservabilityHealthProvider>>,
158 runtime: RuntimeState,
159}
160
161#[derive(Default)]
162struct RuntimeState {
163 dropped_observations_total: AtomicU64,
164 subscriber_failures_total: AtomicU64,
165 projection_failures_total: AtomicU64,
166 last_error: Mutex<Option<DiagnosticSummary>>,
170}
171
172struct ErasedSubscriberRegistration {
173 type_id: TypeId,
174 dispatch: Arc<SubscriberDispatchFn>,
175}
176
177struct ErasedProjectionRegistration {
178 type_id: TypeId,
179 dispatch: Arc<ProjectionDispatchFn>,
180}
181
182enum LoggerHandle {
183 Running(Logger<Running>),
184 Stopped(Logger<Stopped>),
185}
186
187type SubscriberDispatchFn =
188 dyn Fn(&dyn Any) -> Result<DispatchMatch, SubscriberError> + Send + Sync + 'static;
189type ProjectionDispatchFn =
190 dyn Fn(&dyn Any, &Logger<Running>) -> ProjectionDispatchResult + Send + Sync + 'static;
191
192#[derive(Debug, Clone, Copy, PartialEq, Eq)]
193enum DispatchMatch {
194 Skipped,
195 Delivered,
196}
197
198#[derive(Debug, Default, Clone, PartialEq)]
199struct ProjectionDispatchResult {
200 matched: bool,
201 failure_count: u64,
202 last_error: Option<DiagnosticSummary>,
203}
204
205fn log_error_summary(error: &LogError) -> DiagnosticSummary {
206 match error {
207 LogError::InvalidEvent(error) => DiagnosticSummary::from(error.diagnostic()),
208 LogError::WriterDegraded(error) | LogError::ShutdownTimedOut(error) => {
209 DiagnosticSummary::from(error.diagnostic())
210 }
211 }
212}
213
214impl Observability {
215 pub fn new(config: ObservabilityConfig) -> Result<Self, InitError> {
217 Self::builder(config).build()
218 }
219
220 pub fn builder(config: ObservabilityConfig) -> ObservabilityBuilder {
238 ObservabilityBuilder {
239 config,
240 subscribers: Vec::new(),
241 projections: Vec::new(),
242 observability_health_provider: None,
243 }
244 }
245
246 pub fn emit<T>(&self, observation: Observation<T>) -> Result<(), ObservationError>
253 where
254 T: Observable,
255 {
256 if self.shutdown.load(Ordering::SeqCst) {
257 return Err(ObservationError::Shutdown);
258 }
259
260 let observation_any = &observation as &dyn Any;
261 let type_id = TypeId::of::<T>();
262 let mut matched = false;
263
264 for registration in self
265 .subscriber_registrations
266 .iter()
267 .filter(|entry| entry.type_id == type_id)
268 {
269 match (registration.dispatch)(observation_any) {
270 Ok(DispatchMatch::Delivered) => matched = true,
271 Ok(DispatchMatch::Skipped) => {}
272 Err(err) => {
273 self.runtime
274 .subscriber_failures_total
275 .fetch_add(1, Ordering::SeqCst);
276 self.record_last_error(DiagnosticSummary::from(err.diagnostic()));
277 }
278 }
279 }
280
281 for registration in self
282 .projection_registrations
283 .iter()
284 .filter(|entry| entry.type_id == type_id)
285 {
286 let logger = self.logger.lock().expect("observability logger poisoned");
287 let LoggerHandle::Running(logger) = logger
288 .as_ref()
289 .expect("observability logger should exist while runtime is alive")
290 else {
291 return Err(ObservationError::Shutdown);
292 };
293 let result = (registration.dispatch)(observation_any, logger);
294 matched |= result.matched;
295 if result.failure_count > 0 {
296 self.runtime
297 .projection_failures_total
298 .fetch_add(result.failure_count, Ordering::SeqCst);
299 if let Some(summary) = result.last_error {
300 self.record_last_error(summary);
301 }
302 }
303 }
304
305 if !matched {
306 self.runtime
307 .dropped_observations_total
308 .fetch_add(1, Ordering::SeqCst);
309 let context = ErrorContext::new(
312 error_codes::OBSERVATION_ROUTING_FAILURE,
313 "no eligible subscriber or projector path matched the observation",
314 Remediation::recoverable(
315 "register at least one matching subscriber or projector",
316 ["ensure filters allow the emitted observation type"],
317 ),
318 );
319 self.record_last_error(DiagnosticSummary::from(context.diagnostic()));
320 return Err(ObservationError::RoutingFailure(Box::new(context)));
321 }
322
323 Ok(())
324 }
325
326 pub fn flush(&self) -> Result<(), FlushError> {
333 let logger = self.logger.lock().expect("observability logger poisoned");
334 match logger
335 .as_ref()
336 .expect("observability logger should exist while runtime is alive")
337 {
338 LoggerHandle::Running(logger) => logger.flush(),
339 LoggerHandle::Stopped(_) => Ok(()),
340 }
341 }
342
343 pub fn shutdown(&self) -> Result<(), ShutdownError> {
350 if self.shutdown.swap(true, Ordering::SeqCst) {
351 return Ok(());
352 }
353 let mut logger = self.logger.lock().expect("observability logger poisoned");
354 let handle = logger
355 .take()
356 .expect("observability logger should exist while runtime is alive");
357 *logger = Some(match handle {
358 LoggerHandle::Running(logger) => LoggerHandle::Stopped(logger.shutdown()),
359 LoggerHandle::Stopped(logger) => LoggerHandle::Stopped(logger),
360 });
361 Ok(())
362 }
363
364 pub fn health(&self) -> ObservabilityHealthReport {
370 let logging = {
371 let logger = self.logger.lock().expect("observability logger poisoned");
372 match logger
373 .as_ref()
374 .expect("observability logger should exist while runtime is alive")
375 {
376 LoggerHandle::Running(logger) => logger.health(),
377 LoggerHandle::Stopped(logger) => logger.health(),
378 }
379 };
380 let telemetry = self
381 .observability_health_provider
382 .as_ref()
383 .map(sc_observability_types::ObservabilityHealthProvider::telemetry_health);
384 let subscriber_failures = self
385 .runtime
386 .subscriber_failures_total
387 .load(Ordering::SeqCst);
388 let projection_failures = self
389 .runtime
390 .projection_failures_total
391 .load(Ordering::SeqCst);
392 let dropped = self
393 .runtime
394 .dropped_observations_total
395 .load(Ordering::SeqCst);
396
397 let state = if self.shutdown.load(Ordering::SeqCst) {
398 ObservationHealthState::Unavailable
399 } else if dropped > 0
400 || subscriber_failures > 0
401 || projection_failures > 0
402 || logging.state != sc_observability_types::LoggingHealthState::Healthy
403 || telemetry.as_ref().is_some_and(|health| {
404 matches!(
405 health.state,
406 TelemetryHealthState::Degraded | TelemetryHealthState::Unavailable
407 )
408 })
409 {
410 ObservationHealthState::Degraded
411 } else {
412 ObservationHealthState::Healthy
413 };
414
415 ObservabilityHealthReport {
416 state,
417 dropped_observations_total: dropped,
418 subscriber_failures_total: subscriber_failures,
419 projection_failures_total: projection_failures,
420 logging: Some(logging),
421 telemetry,
422 last_error: self
423 .runtime
424 .last_error
425 .lock()
426 .expect("observability last_error poisoned")
427 .clone(),
428 }
429 }
430
431 fn record_last_error(&self, summary: DiagnosticSummary) {
432 *self
433 .runtime
434 .last_error
435 .lock()
436 .expect("observability last_error poisoned") = Some(summary);
437 }
438}
439
440impl ObservabilityBuilder {
441 #[expect(
444 clippy::implied_bounds_in_impls,
445 reason = "the public API intentionally spells out Send + Sync per QA-BP-IMC-007"
446 )]
447 pub fn with_observability_health_provider(
448 mut self,
449 provider: impl ObservabilityHealthProvider + Send + Sync + 'static,
450 ) -> Self {
451 self.observability_health_provider = Some(Arc::new(provider));
452 self
453 }
454
455 pub fn register_subscriber<T>(mut self, registration: SubscriberRegistration<T>) -> Self
462 where
463 T: Observable,
464 {
465 let (subscriber, filter) = registration.into_parts();
466 self.subscribers.push(ErasedSubscriberRegistration {
467 type_id: TypeId::of::<T>(),
468 dispatch: Arc::new(move |observation_any| {
469 let observation = observation_any
470 .downcast_ref::<Observation<T>>()
471 .expect("type-erased routing matched wrong observation type");
472
473 if filter
474 .as_ref()
475 .is_some_and(|filter| !filter.accepts(observation))
476 {
477 return Ok(DispatchMatch::Skipped);
478 }
479
480 subscriber.observe(observation)?;
481 Ok(DispatchMatch::Delivered)
482 }),
483 });
484 self
485 }
486
487 pub fn register_projection<T>(mut self, registration: ProjectionRegistration<T>) -> Self
494 where
495 T: Observable,
496 {
497 let (log_projector, span_projector, metric_projector, filter) = registration.into_parts();
498
499 self.projections.push(ErasedProjectionRegistration {
500 type_id: TypeId::of::<T>(),
501 dispatch: Arc::new(move |observation_any, logger| {
502 let observation = observation_any
503 .downcast_ref::<Observation<T>>()
504 .expect("type-erased routing matched wrong observation type");
505
506 if filter
507 .as_ref()
508 .is_some_and(|filter| !filter.accepts(observation))
509 {
510 return ProjectionDispatchResult::default();
511 }
512
513 let mut result = ProjectionDispatchResult::default();
514 let mut record_failure = |summary: DiagnosticSummary| {
515 result.failure_count += 1;
516 result.last_error = Some(summary);
517 };
518
519 if let Some(projector) = &log_projector {
520 match projector.project_logs(observation) {
521 Ok(events) => {
522 result.matched = true;
523 for event in events {
524 if let Err(err) = logger.log(event) {
525 record_failure(log_error_summary(&err));
526 }
527 }
528 if let Err(err) = logger.flush() {
529 record_failure(DiagnosticSummary::from(err.diagnostic()));
530 }
531 }
532 Err(err) => record_failure(DiagnosticSummary::from(err.diagnostic())),
533 }
534 }
535
536 if let Some(projector) = &span_projector {
537 match projector.project_spans(observation) {
538 Ok(_) => result.matched = true,
539 Err(err) => record_failure(DiagnosticSummary::from(err.diagnostic())),
540 }
541 }
542
543 if let Some(projector) = &metric_projector {
544 match projector.project_metrics(observation) {
545 Ok(_) => result.matched = true,
546 Err(err) => record_failure(DiagnosticSummary::from(err.diagnostic())),
547 }
548 }
549
550 result
551 }),
552 });
553 self
554 }
555
556 pub fn build(self) -> Result<Observability, InitError> {
558 if self.subscribers.is_empty() && self.projections.is_empty() {
559 return Err(InitError(Box::new(ErrorContext::new(
560 error_codes::OBSERVABILITY_INIT_FAILED,
561 "at least one subscriber or projector route must be registered",
562 Remediation::recoverable(
563 "register a subscriber or projector before building observability",
564 ["add at least one route for the observation types you emit"],
565 ),
566 ))));
567 }
568 let logger = Logger::new(self.config.logger_config()?)?;
569 Ok(Observability {
570 logger: Mutex::new(Some(LoggerHandle::Running(logger))),
571 shutdown: AtomicBool::new(false),
572 subscriber_registrations: self.subscribers,
573 projection_registrations: self.projections,
574 observability_health_provider: self.observability_health_provider,
575 runtime: RuntimeState::default(),
576 })
577 }
578}
579
580mod sealed_emitters {
581 pub trait Sealed {}
582}
583
584#[expect(
588 dead_code,
589 reason = "crate-local observation emitter trait is intentionally retained for injection"
590)]
591pub(crate) trait ObservationEmitter<T>: sealed_emitters::Sealed + Send + Sync
592where
593 T: Observable,
594{
595 fn emit(&self, observation: Observation<T>) -> Result<(), ObservationError>;
596}
597
598impl sealed_emitters::Sealed for Observability {}
599
600impl<T> ObservationEmitter<T> for Observability
601where
602 T: Observable,
603{
604 fn emit(&self, observation: Observation<T>) -> Result<(), ObservationError> {
605 Observability::emit(self, observation)
606 }
607}
608
609#[cfg(test)]
610mod tests {
611 use super::*;
612 use sc_observability::{
613 LogFilter, LogSink, LoggerConfig, SinkHealth, SinkHealthState, SinkRegistration,
614 };
615 use sc_observability_types::{
616 ActionName, Diagnostic, ErrorCode, Level, LogEvent, LogSinkError, MetricKind, MetricName,
617 MetricRecord, MetricUnit, ObservationFilter, ObservationSubscriber, ProcessIdentity,
618 ProjectionError, SpanId, SpanProjector, SpanRecord, SpanSignal, SpanStarted,
619 SubscriberError, TargetCategory, TelemetryHealthReport, TelemetryHealthState, Timestamp,
620 TraceContext, TraceId,
621 };
622 use serde_json::Map;
623
624 #[derive(Debug, Clone)]
625 struct AgentEvent {
626 kind: &'static str,
627 allow: bool,
628 }
629
630 struct RecordingSubscriber {
631 id: &'static str,
632 calls: Arc<Mutex<Vec<&'static str>>>,
633 }
634
635 impl ObservationSubscriber<AgentEvent> for RecordingSubscriber {
636 fn observe(&self, _observation: &Observation<AgentEvent>) -> Result<(), SubscriberError> {
637 self.calls.lock().expect("calls poisoned").push(self.id);
638 Ok(())
639 }
640 }
641
642 struct AllowFlagFilter;
643
644 impl ObservationFilter<AgentEvent> for AllowFlagFilter {
645 fn accepts(&self, observation: &Observation<AgentEvent>) -> bool {
646 observation.payload.allow
647 }
648 }
649
650 struct FailingSubscriber;
651
652 impl ObservationSubscriber<AgentEvent> for FailingSubscriber {
653 fn observe(&self, _observation: &Observation<AgentEvent>) -> Result<(), SubscriberError> {
654 Err(SubscriberError(Box::new(ErrorContext::new(
655 error_codes::OBSERVATION_ROUTING_FAILURE,
656 "subscriber failed",
657 Remediation::not_recoverable("test subscriber intentionally fails"),
658 ))))
659 }
660 }
661
662 struct RecordingLogProjector {
663 calls: Arc<Mutex<Vec<&'static str>>>,
664 id: &'static str,
665 }
666
667 impl sc_observability_types::LogProjector<AgentEvent> for RecordingLogProjector {
668 fn project_logs(
669 &self,
670 observation: &Observation<AgentEvent>,
671 ) -> Result<Vec<LogEvent>, ProjectionError> {
672 self.calls.lock().expect("calls poisoned").push(self.id);
673 Ok(vec![log_event(
674 observation.service.clone(),
675 observation.payload.kind,
676 )])
677 }
678 }
679
680 struct RecordingSpanProjector {
681 count: Arc<AtomicU64>,
682 }
683
684 impl SpanProjector<AgentEvent> for RecordingSpanProjector {
685 fn project_spans(
686 &self,
687 observation: &Observation<AgentEvent>,
688 ) -> Result<Vec<SpanSignal>, ProjectionError> {
689 self.count.fetch_add(1, Ordering::SeqCst);
690 Ok(vec![SpanSignal::Started(SpanRecord::<SpanStarted>::new(
691 Timestamp::UNIX_EPOCH,
692 observation.service.clone(),
693 ActionName::new("span.started").expect("valid action"),
694 trace_context(),
695 Map::default(),
696 ))])
697 }
698 }
699
700 struct RecordingMetricProjector {
701 count: Arc<AtomicU64>,
702 }
703
704 impl sc_observability_types::MetricProjector<AgentEvent> for RecordingMetricProjector {
705 fn project_metrics(
706 &self,
707 observation: &Observation<AgentEvent>,
708 ) -> Result<Vec<MetricRecord>, ProjectionError> {
709 self.count.fetch_add(1, Ordering::SeqCst);
710 Ok(vec![MetricRecord {
711 timestamp: Timestamp::UNIX_EPOCH,
712 service: observation.service.clone(),
713 name: MetricName::new("obs.events_total").expect("valid metric"),
714 kind: MetricKind::Counter,
715 value: 1.0,
716 unit: Some(MetricUnit::new("1").expect("valid metric unit")),
717 attributes: Map::default(),
718 }])
719 }
720 }
721
722 struct FailingProjector;
723
724 impl sc_observability_types::LogProjector<AgentEvent> for FailingProjector {
725 fn project_logs(
726 &self,
727 _observation: &Observation<AgentEvent>,
728 ) -> Result<Vec<LogEvent>, ProjectionError> {
729 Err(ProjectionError(Box::new(ErrorContext::new(
730 error_codes::OBSERVATION_ROUTING_FAILURE,
731 "projector failed",
732 Remediation::not_recoverable("test projector intentionally fails"),
733 ))))
734 }
735 }
736
737 struct FakeTelemetryProvider {
738 state: TelemetryHealthState,
739 }
740
741 impl sc_observability_types::telemetry_health_provider_sealed::Sealed for FakeTelemetryProvider {
742 fn token(&self) -> sc_observability_types::telemetry_health_provider_sealed::Token {
743 sc_observability_types::telemetry_health_provider_sealed::workspace_token()
744 }
745 }
746
747 impl ObservabilityHealthProvider for FakeTelemetryProvider {
748 fn telemetry_health(&self) -> TelemetryHealthReport {
749 TelemetryHealthReport {
750 state: self.state,
751 dropped_exports_total: 0,
752 malformed_spans_total: 0,
753 exporter_statuses: Vec::new(),
754 last_error: None,
755 }
756 }
757 }
758
759 fn tool_name() -> ToolName {
760 ToolName::new("obs-app").expect("valid tool name")
761 }
762
763 fn temp_path(name: &str) -> PathBuf {
764 std::env::temp_dir().join(format!(
765 "sc-observe-{name}-{}-{}",
766 std::process::id(),
767 std::time::SystemTime::now()
768 .duration_since(std::time::SystemTime::UNIX_EPOCH)
769 .expect("system time before unix epoch")
770 .as_nanos()
771 ))
772 }
773
774 fn trace_context() -> TraceContext {
775 TraceContext {
776 trace_id: TraceId::new("0123456789abcdef0123456789abcdef").expect("valid trace id"),
777 span_id: SpanId::new("0123456789abcdef").expect("valid span id"),
778 parent_span_id: None,
779 }
780 }
781
782 fn schema_version() -> sc_observability_types::SchemaVersion {
783 sc_observability_types::SchemaVersion::new(
784 sc_observability_types::constants::OBSERVATION_ENVELOPE_VERSION,
785 )
786 .expect("valid schema version")
787 }
788
789 fn outcome_label(value: &str) -> sc_observability_types::OutcomeLabel {
790 sc_observability_types::OutcomeLabel::new(value).expect("valid outcome label")
791 }
792
793 fn sink_name(value: &str) -> sc_observability_types::SinkName {
794 sc_observability_types::SinkName::new(value).expect("valid sink name")
795 }
796
797 fn observation(allow: bool) -> Observation<AgentEvent> {
798 let mut observation = Observation::new(
799 ServiceName::new("obs-app").expect("valid service"),
800 AgentEvent {
801 kind: "received",
802 allow,
803 },
804 );
805 observation.identity = ProcessIdentity::default();
806 observation
807 }
808
809 fn log_event(service: ServiceName, message: &str) -> LogEvent {
810 LogEvent {
811 version: schema_version(),
812 timestamp: Timestamp::UNIX_EPOCH,
813 level: Level::Info,
814 service,
815 target: TargetCategory::new("observe.routing").expect("valid target"),
816 action: ActionName::new("observation.received").expect("valid action"),
817 message: Some(message.to_string()),
818 identity: ProcessIdentity::default(),
819 trace: Some(trace_context()),
820 request_id: None,
821 correlation_id: None,
822 outcome: Some(outcome_label("ok")),
823 diagnostic: Some(Diagnostic {
824 timestamp: Timestamp::UNIX_EPOCH,
825 code: ErrorCode::new_static("SC_TEST"),
826 message: "projected".to_string(),
827 cause: None,
828 remediation: Remediation::recoverable("retry", ["inspect log output"]),
829 docs: None,
830 details: Map::default(),
831 }),
832 state_transition: None,
833 fields: Map::default(),
834 }
835 }
836
837 #[test]
838 fn registration_order_routing_is_deterministic() {
839 let calls = Arc::new(Mutex::new(Vec::new()));
840 let root = temp_path("order");
841 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
842 let runtime = Observability::builder(config)
843 .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
844 id: "first",
845 calls: calls.clone(),
846 })))
847 .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
848 id: "second",
849 calls: calls.clone(),
850 })))
851 .build()
852 .expect("runtime");
853
854 runtime.emit(observation(true)).expect("emit");
855
856 assert_eq!(
857 *calls.lock().expect("calls poisoned"),
858 vec!["first", "second"]
859 );
860 }
861
862 #[test]
863 fn filter_acceptance_and_rejection_are_respected() {
864 let calls = Arc::new(Mutex::new(Vec::new()));
865 let root = temp_path("filter");
866 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
867 let runtime = Observability::builder(config)
868 .register_subscriber(
869 SubscriberRegistration::new(Arc::new(RecordingSubscriber {
870 id: "allowed",
871 calls: calls.clone(),
872 }))
873 .with_filter(Arc::new(AllowFlagFilter)),
874 )
875 .build()
876 .expect("runtime");
877
878 assert!(runtime.emit(observation(false)).is_err());
879 runtime.emit(observation(true)).expect("emit");
880
881 assert_eq!(*calls.lock().expect("calls poisoned"), vec!["allowed"]);
882 }
883
884 #[test]
885 fn subscriber_failures_are_isolated() {
886 let calls = Arc::new(Mutex::new(Vec::new()));
887 let root = temp_path("subscriber-failure");
888 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
889 let runtime = Observability::builder(config)
890 .register_subscriber(SubscriberRegistration::new(Arc::new(FailingSubscriber)))
891 .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
892 id: "still-runs",
893 calls: calls.clone(),
894 })))
895 .build()
896 .expect("runtime");
897
898 runtime.emit(observation(true)).expect("emit");
899
900 let health = runtime.health();
901 assert_eq!(health.subscriber_failures_total, 1);
902 assert_eq!(*calls.lock().expect("calls poisoned"), vec!["still-runs"]);
903 assert_eq!(health.state, ObservationHealthState::Degraded);
904 }
905
906 #[test]
907 fn projector_failures_are_isolated() {
908 let log_calls = Arc::new(Mutex::new(Vec::new()));
909 let span_count = Arc::new(AtomicU64::new(0));
910 let metric_count = Arc::new(AtomicU64::new(0));
911 let root = temp_path("projector-failure");
912 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
913 let runtime = Observability::builder(config)
914 .register_projection(
915 ProjectionRegistration::new()
916 .with_log_projector(Arc::new(FailingProjector))
917 .with_span_projector(Arc::new(RecordingSpanProjector {
918 count: span_count.clone(),
919 }))
920 .with_metric_projector(Arc::new(RecordingMetricProjector {
921 count: metric_count.clone(),
922 })),
923 )
924 .register_projection(ProjectionRegistration::new().with_log_projector(Arc::new(
925 RecordingLogProjector {
926 calls: log_calls.clone(),
927 id: "log",
928 },
929 )))
930 .build()
931 .expect("runtime");
932
933 runtime.emit(observation(true)).expect("emit");
934
935 let health = runtime.health();
936 assert_eq!(health.projection_failures_total, 1);
937 assert_eq!(span_count.load(Ordering::SeqCst), 1);
938 assert_eq!(metric_count.load(Ordering::SeqCst), 1);
939 assert_eq!(*log_calls.lock().expect("calls poisoned"), vec!["log"]);
940 }
941
942 #[test]
943 fn routing_failure_occurs_when_no_eligible_path_remains() {
944 let root = temp_path("routing-failure");
945 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
946 let runtime = Observability::builder(config)
947 .register_subscriber(
948 SubscriberRegistration::new(Arc::new(RecordingSubscriber {
949 id: "filtered",
950 calls: Arc::new(Mutex::new(Vec::new())),
951 }))
952 .with_filter(Arc::new(AllowFlagFilter)),
953 )
954 .build()
955 .expect("runtime");
956
957 let result = runtime.emit(observation(false));
958
959 assert!(matches!(result, Err(ObservationError::RoutingFailure(_))));
960 assert_eq!(runtime.health().dropped_observations_total, 1);
961 }
962
963 #[test]
964 fn routing_failure_occurs_when_all_projectors_fail() {
965 let root = temp_path("projector-routing-failure");
966 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
967 let runtime = Observability::builder(config)
968 .register_projection(
969 ProjectionRegistration::new().with_log_projector(Arc::new(FailingProjector)),
970 )
971 .build()
972 .expect("runtime");
973
974 let result = runtime.emit(observation(true));
975
976 assert!(matches!(result, Err(ObservationError::RoutingFailure(_))));
977 let health = runtime.health();
978 assert_eq!(health.dropped_observations_total, 1);
979 assert_eq!(health.projection_failures_total, 1);
980 }
981
982 #[test]
983 fn post_shutdown_emission_returns_shutdown_error() {
984 let root = temp_path("shutdown");
985 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
986 let runtime = Observability::builder(config)
987 .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
988 id: "shutdown",
989 calls: Arc::new(Mutex::new(Vec::new())),
990 })))
991 .build()
992 .expect("runtime");
993
994 runtime.shutdown().expect("shutdown");
995
996 assert!(matches!(
997 runtime.emit(observation(true)),
998 Err(ObservationError::Shutdown)
999 ));
1000 }
1001
1002 #[test]
1003 fn top_level_health_aggregates_logging_and_routing_state() {
1004 let root = temp_path("health");
1005 let config = ObservabilityConfig::default_for(tool_name(), root.clone()).expect("config");
1006 let runtime = Observability::builder(config)
1007 .register_projection(
1008 ProjectionRegistration::new().with_log_projector(Arc::new(FailingProjector)),
1009 )
1010 .build()
1011 .expect("runtime");
1012
1013 let _ = runtime.emit(observation(true));
1014 let health = runtime.health();
1015
1016 assert_eq!(health.state, ObservationHealthState::Degraded);
1017 assert_eq!(health.projection_failures_total, 1);
1018 assert!(health.logging.is_some());
1019 assert!(health.last_error.is_some());
1020 assert!(health.telemetry.is_none());
1021 }
1022
1023 #[test]
1024 fn top_level_health_exposes_attached_telemetry_provider() {
1025 let root = temp_path("telemetry-health");
1026 let config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
1027 let runtime = Observability::builder(config)
1028 .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
1029 id: "telemetry-health",
1030 calls: Arc::new(Mutex::new(Vec::new())),
1031 })))
1032 .with_observability_health_provider(Arc::new(FakeTelemetryProvider {
1033 state: TelemetryHealthState::Degraded,
1034 }))
1035 .build()
1036 .expect("runtime");
1037
1038 let health = runtime.health();
1039
1040 assert_eq!(health.state, ObservationHealthState::Degraded);
1041 assert_eq!(
1042 health.telemetry.expect("telemetry health").state,
1043 TelemetryHealthState::Degraded
1044 );
1045 }
1046
1047 #[test]
1048 fn queue_capacity_override_propagates_to_logger_config() {
1049 let root = temp_path("queue-capacity");
1050 let mut config = ObservabilityConfig::default_for(tool_name(), root).expect("config");
1051 config.queue_capacity = 2048;
1052
1053 let logger_config = config.logger_config().expect("logger config");
1054
1055 assert_eq!(logger_config.queue_capacity, 2048);
1056 }
1057
1058 #[test]
1059 fn flush_forwards_logger_flush_behavior_directly() {
1060 struct PassthroughFilter;
1061
1062 impl LogFilter for PassthroughFilter {
1063 fn accepts(&self, _event: &LogEvent) -> bool {
1064 true
1065 }
1066 }
1067
1068 struct FlushFailSink;
1069
1070 impl LogSink for FlushFailSink {
1071 fn write(&self, _event: &LogEvent) -> Result<(), LogSinkError> {
1072 Ok(())
1073 }
1074
1075 fn flush(&self) -> Result<(), LogSinkError> {
1076 Err(LogSinkError(Box::new(ErrorContext::new(
1077 sc_observability::error_codes::LOGGER_FLUSH_FAILED,
1078 "flush failed",
1079 Remediation::not_recoverable("test sink intentionally fails flush"),
1080 ))))
1081 }
1082
1083 fn health(&self) -> SinkHealth {
1084 SinkHealth {
1085 name: sink_name("flush-fail"),
1086 state: SinkHealthState::DegradedDropping,
1087 last_error: None,
1088 }
1089 }
1090 }
1091
1092 let ok_root = temp_path("flush-ok");
1093 let ok_config =
1094 ObservabilityConfig::default_for(tool_name(), ok_root.clone()).expect("config");
1095 let ok_runtime = Observability::builder(ok_config)
1096 .register_subscriber(SubscriberRegistration::new(Arc::new(RecordingSubscriber {
1097 id: "flush-ok",
1098 calls: Arc::new(Mutex::new(Vec::new())),
1099 })))
1100 .build()
1101 .expect("runtime");
1102 assert!(ok_runtime.flush().is_ok());
1103
1104 let fail_root = temp_path("flush-fail");
1105 let mut logger_config =
1106 LoggerConfig::default_for(ServiceName::new("obs-app").expect("service"), fail_root);
1107 logger_config.enable_file_sink = false;
1108 logger_config.enable_console_sink = false;
1109 let mut builder = sc_observability::Logger::builder(logger_config).expect("logger builder");
1110 builder.register_sink(
1111 SinkRegistration::new(Arc::new(FlushFailSink)).with_filter(Arc::new(PassthroughFilter)),
1112 );
1113 let logger = builder.build();
1114
1115 let runtime = Observability {
1116 logger: Mutex::new(Some(LoggerHandle::Running(logger))),
1117 shutdown: AtomicBool::new(false),
1118 subscriber_registrations: Vec::new(),
1119 projection_registrations: Vec::new(),
1120 observability_health_provider: None,
1121 runtime: RuntimeState::default(),
1122 };
1123
1124 assert!(runtime.flush().is_err());
1125 let logging = runtime.health().logging.expect("logging health");
1126 assert_eq!(logging.flush_errors_total, 1);
1127 assert!(logging.last_error.is_some());
1128 }
1129}