1use std::sync::{
6 Arc,
7 atomic::{AtomicU32, Ordering},
8};
9
10use arc_swap::ArcSwap;
11use bytes;
12use obs_proto::obs::v1::{ObsEnvelope, SamplingReason as PSamplingReason, Tier};
13use parking_lot::Mutex;
14
15use super::{
16 Observer,
17 workers::{TierWorker, WorkerCounters, note_channel_full, spawn_tier_worker},
18};
19use crate::{
20 audit_spool::SpoolWriter,
21 callsite::ObsCallsite,
22 config::{AuditFailureMode, EventsConfig},
23 filter::Filter,
24 registry::{ObsCallsiteRegistry, SchemaRegistry, ScrubbedEnvelope},
25 resource::ResourceAttrs,
26 sampling::{SamplingDecision, decide as sample_decide},
27 scope::{auto_fill_envelope, inbound_traceparent_sampled, push_tail_buffer},
28 sink::{NoopSink, Sink, SinkFut, StdoutSink},
29};
30
31#[derive(Default)]
33struct SinkRouter {
34 log: Option<Arc<dyn Sink>>,
35 metric: Option<Arc<dyn Sink>>,
36 trace: Option<Arc<dyn Sink>>,
37 audit: Option<Arc<dyn Sink>>,
38 fallback: Option<Arc<dyn Sink>>,
39}
40
41impl std::fmt::Debug for SinkRouter {
42 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
43 f.debug_struct("SinkRouter")
44 .field("log", &self.log.as_ref().map(|_| "..."))
45 .field("metric", &self.metric.as_ref().map(|_| "..."))
46 .field("trace", &self.trace.as_ref().map(|_| "..."))
47 .field("audit", &self.audit.as_ref().map(|_| "..."))
48 .field("fallback", &self.fallback.as_ref().map(|_| "..."))
49 .finish()
50 }
51}
52
53impl SinkRouter {
54 fn for_tier(&self, tier: Tier) -> Option<&Arc<dyn Sink>> {
55 let primary = match tier {
56 Tier::Log => self.log.as_ref(),
57 Tier::Metric => self.metric.as_ref(),
58 Tier::Trace => self.trace.as_ref(),
59 Tier::Audit => self.audit.as_ref(),
60 _ => None,
61 };
62 primary.or(self.fallback.as_ref())
63 }
64}
65
66#[derive(Debug, Default)]
68struct WorkerPool {
69 log: Option<TierWorker>,
70 metric: Option<TierWorker>,
71 trace: Option<TierWorker>,
72 audit: Option<TierWorker>,
73}
74
75pub struct StandardObserver {
78 router: SinkRouter,
79 workers: WorkerPool,
80 spool: Option<Arc<SpoolWriter>>,
81 registry: Arc<SchemaRegistry>,
82 callsites: Arc<ObsCallsiteRegistry>,
83 config: ArcSwap<EventsConfig>,
84 filter: ArcSwap<Filter>,
85 resource: ArcSwap<ResourceAttrs>,
88 counters: Arc<WorkerCounters>,
89 generation: AtomicU32,
90 service: String,
91 instance: String,
92 version: String,
93 sync_dispatch_lock: Mutex<()>,
96}
97
98impl std::fmt::Debug for StandardObserver {
99 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
100 f.debug_struct("StandardObserver")
101 .field("schemas", &self.registry.len())
102 .field("service", &self.service)
103 .field("instance", &self.instance)
104 .field("version", &self.version)
105 .field("generation", &self.generation.load(Ordering::Relaxed))
106 .finish_non_exhaustive()
107 }
108}
109
110impl StandardObserver {
111 #[must_use]
113 pub fn builder() -> StandardObserverBuilder {
114 StandardObserverBuilder::default()
115 }
116
117 pub fn dev() -> Result<Self, BuildError> {
124 Self::builder()
125 .service("dev", env!("CARGO_PKG_VERSION"))
126 .sink_fallback(Arc::new(StdoutSink::default()))
127 .build()
128 }
129
130 #[must_use]
132 pub fn registry(&self) -> Arc<SchemaRegistry> {
133 Arc::clone(&self.registry)
134 }
135
136 #[must_use]
140 pub fn callsites(&self) -> Arc<ObsCallsiteRegistry> {
141 Arc::clone(&self.callsites)
142 }
143
144 #[must_use]
146 pub fn config(&self) -> arc_swap::Guard<Arc<EventsConfig>> {
147 self.config.load()
148 }
149
150 pub fn set_resource_attrs(&self, attrs: ResourceAttrs) {
154 self.resource.store(Arc::new(attrs));
155 }
156
157 pub fn reload_config(&self, new_config: EventsConfig) -> Result<(), BuildError> {
165 if let Err(e) = new_config.validate() {
166 crate::self_events::emit_config_reload_failed(&format!("validate: {e}"));
167 return Err(BuildError::InvalidConfig(e));
168 }
169 if let Some(spec) = new_config.filter.as_deref() {
170 match Filter::parse(spec) {
171 Ok(parsed) => self.filter.store(Arc::new(parsed)),
172 Err(e) => {
173 crate::self_events::emit_config_reload_failed(&format!("filter: {e}"));
174 return Err(BuildError::InvalidConfig(
175 crate::config::ConfigError::invalid_range("filter", format!("{e}")),
176 ));
177 }
178 }
179 } else {
180 self.filter.store(Arc::new(Filter::new()));
181 }
182 let cfg_hash = config_hash(&new_config);
186 self.config.store(Arc::new(new_config));
187 self.generation.fetch_add(1, Ordering::Release);
188 crate::self_events::emit_config_reloaded(cfg_hash);
189 Ok(())
190 }
191
192 #[must_use]
194 pub fn filter(&self) -> Arc<Filter> {
195 self.filter.load_full()
196 }
197
198 #[must_use]
200 pub fn counters(&self) -> Arc<WorkerCounters> {
201 Arc::clone(&self.counters)
202 }
203
204 fn fill_identity(&self, env: &mut ObsEnvelope) {
205 if env.service.is_empty() {
206 env.service.clone_from(&self.service);
207 }
208 if env.instance.is_empty() {
209 env.instance.clone_from(&self.instance);
210 }
211 if env.version.is_empty() {
212 env.version.clone_from(&self.version);
213 }
214 }
215
216 fn dispatch_sync(&self, env: ObsEnvelope, tier: Tier) {
217 let _g = self.sync_dispatch_lock.lock();
222 let Some(sink) = self.router.for_tier(tier) else {
223 return;
224 };
225 let mut scratch = bytes::BytesMut::with_capacity(env.payload.len());
226 let scrubbed = match ScrubbedEnvelope::scrub(&env, &self.registry, &mut scratch) {
227 Ok(s) => s,
228 Err(_) => return,
229 };
230 sink.deliver(scrubbed);
231 }
232
233 fn dispatch_async(&self, env: ObsEnvelope, tier: Tier) {
234 let worker = match tier {
235 Tier::Log => self.workers.log.as_ref(),
236 Tier::Metric => self.workers.metric.as_ref(),
237 Tier::Trace => self.workers.trace.as_ref(),
238 Tier::Audit => self.workers.audit.as_ref(),
239 _ => None,
240 };
241 let Some(worker) = worker else {
242 self.dispatch_sync(env, tier);
244 return;
245 };
246 if tier == Tier::Audit {
247 self.dispatch_audit(worker, env);
248 } else {
249 match worker.try_send(env) {
250 Ok(()) => {}
251 Err(_dropped) => {
252 note_channel_full(&self.counters, tier);
253 }
254 }
255 }
256 }
257
258 fn dispatch_audit(&self, worker: &TierWorker, env: ObsEnvelope) {
259 let cfg = self.config.load();
260 let block_ms = u64::from(cfg.audit.block_ms_max);
261 let mut env_unsent = match worker.try_send(env) {
263 Ok(()) => return,
264 Err(env) => env,
265 };
266 let started = std::time::Instant::now();
274 let interval = std::time::Duration::from_millis(2);
275 while started.elapsed().as_millis() < u128::from(block_ms) {
276 match worker.try_send(env_unsent) {
277 Ok(()) => return,
278 Err(env) => env_unsent = env,
279 }
280 std::thread::sleep(interval);
281 }
282 if let Some(spool) = self.spool.as_ref() {
284 match spool.append(&env_unsent) {
285 Ok(()) => {
286 note_channel_full(&self.counters, Tier::Audit);
287 crate::self_events::emit_audit_spooled(env_unsent.full_name.as_str());
288 }
289 Err(e) => {
290 crate::self_events::emit_audit_spool_failed(&e.to_string());
291 self.handle_spool_failure();
292 }
293 }
294 } else {
295 crate::self_events::emit_audit_spool_failed("no spool configured");
296 self.handle_spool_failure();
297 }
298 }
299
300 fn handle_spool_failure(&self) {
301 #[allow(clippy::panic)]
307 {
308 let cfg = self.config.load();
309 match cfg.audit.on_failure {
310 AuditFailureMode::Panic => {
311 panic!("audit spool unwritable; compliance failure")
312 }
313 AuditFailureMode::Abort => std::process::abort(),
314 AuditFailureMode::WarnOnly => {
315 eprintln!("[obs] AUDIT spool unwritable; envelope dropped (warn_only)");
316 }
317 }
318 }
319 }
320
321 fn recover_audit_spool(&self) {
327 let cfg = self.config.load();
328 let dir = cfg.audit.spool_dir.clone();
329 if !dir.exists() {
330 return;
331 }
332 let mut total: u64 = 0;
333 let report = crate::audit_spool::recover(&dir, |env| {
334 total += 1;
335 if let Some(worker) = self.workers.audit.as_ref() {
337 let _ = worker.try_send(env);
338 } else {
339 self.dispatch_sync(env, Tier::Audit);
340 }
341 Ok(())
342 });
343 if total == 0 {
344 let _ = report;
345 return;
346 }
347 let mut env = ObsEnvelope {
348 full_name: "obs.runtime.v1.ObsAuditSpoolRecovered".to_string(),
349 tier: ::buffa::EnumValue::Known(obs_proto::obs::v1::Tier::TIER_LOG),
350 sev: ::buffa::EnumValue::Known(obs_proto::obs::v1::Severity::SEVERITY_INFO),
351 ..Default::default()
352 };
353 env.labels
354 .insert("record_count".to_string(), total.to_string());
355 self.fill_identity(&mut env);
358 self.dispatch_sync(env, Tier::Log);
359 }
360
361 fn run_emit_pipeline(&self, env: &mut ObsEnvelope, sev: obs_proto::obs::v1::Severity) -> bool {
366 auto_fill_envelope(env);
368 let cfg_pre = self.config.load();
373 let max_bytes = u64::from(cfg_pre.limits.max_payload_bytes);
374 let payload_size = env.payload.len() as u64;
375 if max_bytes > 0 && payload_size > max_bytes {
376 crate::self_events::emit_oversized_dropped(env.full_name.as_str(), payload_size);
377 return false;
378 }
379 let max_label_bytes = u64::from(cfg_pre.limits.max_label_value_bytes);
387 if max_label_bytes > 0 {
388 for (k, v) in &env.labels {
389 if v.len() as u64 > max_label_bytes {
390 crate::self_events::emit_oversized_label_dropped(
391 env.full_name.as_str(),
392 k,
393 v.len() as u64,
394 );
395 return false;
396 }
397 }
398 }
399 let filter = self.filter.load();
404 if !filter.event_allowed(env, sev) {
405 return false;
406 }
407 let bypass_sampler = matches!(
410 env.sampling_reason,
411 ::buffa::EnumValue::Known(
412 PSamplingReason::SAMPLING_REASON_FORENSIC
413 | PSamplingReason::SAMPLING_REASON_AUDIT
414 | PSamplingReason::SAMPLING_REASON_OVERRIDE,
415 )
416 );
417 if bypass_sampler {
418 return true;
419 }
420 let cfg = self.config.load();
421 let inbound = inbound_traceparent_sampled();
422 match sample_decide(&cfg.sampling, env.full_name.as_str(), sev, inbound) {
423 SamplingDecision::Drop => {
424 return false;
425 }
426 SamplingDecision::Keep => {}
427 SamplingDecision::ParentSet { sampled: true } => {
428 env.sampling_reason =
429 ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_OVERRIDE);
430 }
431 SamplingDecision::ParentSet { sampled: false } => {
432 return false;
433 }
434 }
435 if matches!(
437 sev,
438 obs_proto::obs::v1::Severity::Trace | obs_proto::obs::v1::Severity::Debug
439 ) {
440 push_tail_buffer(env);
441 } else if sev >= obs_proto::obs::v1::Severity::Error {
442 crate::scope::mark_error_on_active_scopes();
443 }
444 true
445 }
446}
447
448impl Observer for StandardObserver {
449 fn emit_envelope(&self, mut env: ObsEnvelope) {
450 self.fill_identity(&mut env);
451 let sev = match env.sev {
452 ::buffa::EnumValue::Known(s) => s,
453 ::buffa::EnumValue::Unknown(_) => obs_proto::obs::v1::Severity::Unspecified,
454 };
455 if !self.run_emit_pipeline(&mut env, sev) {
456 return;
457 }
458 let tier = match env.tier {
459 ::buffa::EnumValue::Known(t) => t,
460 ::buffa::EnumValue::Unknown(_) => Tier::Unspecified,
461 };
462 if let Ok(_h) = tokio::runtime::Handle::try_current() {
463 self.dispatch_async(env, tier);
464 } else {
465 self.dispatch_sync(env, tier);
466 }
467 }
468
469 fn enabled(&self, callsite: &ObsCallsite) -> bool {
470 let filter = self.filter.load();
477 filter.callsite_interest(callsite) != crate::callsite::Interest::Never
478 }
479
480 fn generation(&self) -> u32 {
481 self.generation.load(Ordering::Acquire)
482 }
483
484 fn reload_filter(&self) {
485 self.generation.fetch_add(1, Ordering::Release);
486 }
487
488 fn flush(&self) -> SinkFut<'_> {
489 Box::pin(async move {
490 for w in [
491 self.workers.log.as_ref(),
492 self.workers.metric.as_ref(),
493 self.workers.trace.as_ref(),
494 self.workers.audit.as_ref(),
495 ]
496 .iter()
497 .flatten()
498 {
499 w.flush().await;
500 }
501 })
502 }
503
504 fn shutdown(&self) -> SinkFut<'_> {
505 Box::pin(async move {
506 for w in [
507 self.workers.log.as_ref(),
508 self.workers.metric.as_ref(),
509 self.workers.trace.as_ref(),
510 self.workers.audit.as_ref(),
511 ]
512 .iter()
513 .flatten()
514 {
515 w.shutdown().await;
516 }
517 if let Some(spool) = self.spool.as_ref() {
518 spool.close();
519 }
520 })
521 }
522
523 fn shutdown_blocking(&self, timeout: std::time::Duration) {
524 match tokio::runtime::Handle::try_current() {
534 Err(_) => {
535 if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
536 .enable_all()
537 .build()
538 {
539 let _ = rt.block_on(tokio::time::timeout(timeout, self.shutdown()));
540 }
541 }
542 Ok(handle) => {
543 if matches!(
544 handle.runtime_flavor(),
545 tokio::runtime::RuntimeFlavor::MultiThread
546 ) {
547 tokio::task::block_in_place(|| {
548 let _ = handle.block_on(tokio::time::timeout(timeout, self.shutdown()));
549 });
550 } else {
551 eprintln!(
556 "obs: shutdown_blocking called from a current-thread tokio runtime; use \
557 `Observer::shutdown().await` instead"
558 );
559 }
560 }
561 }
562 }
563
564 fn callsites(&self) -> Option<Arc<ObsCallsiteRegistry>> {
565 Some(Arc::clone(&self.callsites))
566 }
567
568 fn schema_registry(&self) -> Option<Arc<SchemaRegistry>> {
569 Some(Arc::clone(&self.registry))
570 }
571
572 fn resource_attrs(&self) -> Arc<ResourceAttrs> {
573 self.resource.load_full()
574 }
575}
576
577pub struct StandardObserverBuilder {
579 router: SinkRouter,
580 registry: Option<Arc<SchemaRegistry>>,
581 config: Option<EventsConfig>,
582 filter_spec: Option<String>,
583 service: Option<String>,
584 instance: Option<String>,
585 version: Option<String>,
586 spawn_workers: bool,
587}
588
589impl Default for StandardObserverBuilder {
590 fn default() -> Self {
591 Self {
592 router: SinkRouter::default(),
593 registry: None,
594 config: None,
595 filter_spec: None,
596 service: None,
597 instance: None,
598 version: None,
599 spawn_workers: true,
600 }
601 }
602}
603
604impl std::fmt::Debug for StandardObserverBuilder {
605 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
606 f.debug_struct("StandardObserverBuilder")
607 .field("service", &self.service)
608 .field("version", &self.version)
609 .field("spawn_workers", &self.spawn_workers)
610 .finish_non_exhaustive()
611 }
612}
613
614impl StandardObserverBuilder {
615 #[must_use]
617 pub fn service(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
618 self.service = Some(name.into());
619 self.version = Some(version.into());
620 self
621 }
622
623 #[must_use]
625 pub fn instance(mut self, instance: impl Into<String>) -> Self {
626 self.instance = Some(instance.into());
627 self
628 }
629
630 #[must_use]
633 pub fn sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
634 match tier {
635 Tier::Log => self.router.log = Some(sink),
636 Tier::Metric => self.router.metric = Some(sink),
637 Tier::Trace => self.router.trace = Some(sink),
638 Tier::Audit => self.router.audit = Some(sink),
639 _ => {}
640 }
641 self
642 }
643
644 #[must_use]
646 pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
647 self.router.fallback = Some(sink);
648 self
649 }
650
651 #[must_use]
653 pub fn config(mut self, cfg: EventsConfig) -> Self {
654 self.config = Some(cfg);
655 self
656 }
657
658 #[must_use]
660 pub fn filter(mut self, spec: impl Into<String>) -> Self {
661 self.filter_spec = Some(spec.into());
662 self
663 }
664
665 #[must_use]
667 pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
668 self.registry = Some(registry);
669 self
670 }
671
672 #[must_use]
676 pub fn spawn_workers(mut self, yes: bool) -> Self {
677 self.spawn_workers = yes;
678 self
679 }
680
681 pub fn build(self) -> Result<StandardObserver, BuildError> {
688 let mut cfg = self.config.unwrap_or_default();
689 if !cfg.dev_mode
693 && let Ok(v) = std::env::var("OBS_DEV")
694 {
695 let on = matches!(
696 v.trim().to_ascii_lowercase().as_str(),
697 "1" | "true" | "yes" | "on"
698 );
699 cfg.dev_mode = on;
700 }
701 cfg.validate().map_err(BuildError::InvalidConfig)?;
702
703 let filter_spec = self
704 .filter_spec
705 .or_else(|| cfg.filter.clone())
706 .or_else(|| std::env::var("OBS_FILTER").ok());
707 let filter = match filter_spec.as_deref() {
708 Some(spec) => Filter::parse(spec).map_err(|e| {
709 BuildError::InvalidConfig(crate::config::ConfigError::invalid_range(
710 "filter",
711 format!("{e}"),
712 ))
713 })?,
714 None => Filter::new(),
715 };
716
717 let registry = self
718 .registry
719 .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
720
721 let service = self
723 .service
724 .or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
725 .unwrap_or_else(|| "obs".to_string());
726 let version = self
727 .version
728 .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
729 let instance = self.instance.unwrap_or_default();
730
731 let counters = Arc::new(WorkerCounters::default());
732 let spool = if self.router.audit.is_some() {
733 Some(Arc::new(
734 SpoolWriter::open_with_fsync(
735 cfg.audit.spool_dir.clone(),
736 cfg.audit.spool_max_bytes,
737 cfg.audit.on_failure,
738 cfg.audit.fsync_mode,
739 )
740 .map_err(BuildError::SpoolOpen)?,
741 ))
742 } else {
743 None
744 };
745 let workers = if self.spawn_workers {
746 spawn_pool(&self.router, ®istry, &counters, &cfg.queues)
747 } else {
748 WorkerPool::default()
749 };
750
751 let resource = build_resource_from_env(&service, &version, &instance);
752 let observer = StandardObserver {
753 router: self.router,
754 workers,
755 spool,
756 registry,
757 callsites: Arc::new(ObsCallsiteRegistry::new()),
758 config: ArcSwap::from_pointee(cfg),
759 filter: ArcSwap::from_pointee(filter),
760 resource: ArcSwap::from_pointee(resource),
761 counters,
762 generation: AtomicU32::new(1),
763 service,
764 instance,
765 version,
766 sync_dispatch_lock: Mutex::new(()),
767 };
768 observer.recover_audit_spool();
774 let schema_count = observer.registry.len() as u64;
778 crate::self_events::emit_registry_initialized(schema_count, 0);
779 Ok(observer)
780 }
781}
782
783fn build_resource_from_env(service: &str, version: &str, instance: &str) -> ResourceAttrs {
787 let mut r = ResourceAttrs {
788 service_name: service.to_string(),
789 service_version: version.to_string(),
790 service_instance_id: instance.to_string(),
791 ..Default::default()
792 };
793 if let Ok(name) = std::env::var("OTEL_SERVICE_NAME")
794 && !name.is_empty()
795 {
796 r.service_name = name;
797 }
798 if let Ok(extras) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") {
799 for pair in extras.split(',') {
800 let pair = pair.trim();
801 if pair.is_empty() {
802 continue;
803 }
804 if let Some((k, v)) = pair.split_once('=') {
805 let key = k.trim();
806 let val = v.trim().to_string();
807 match key {
808 "service.name" => r.service_name = val,
809 "service.version" => r.service_version = val,
810 "service.namespace" => r.service_namespace = val,
811 "service.instance.id" => r.service_instance_id = val,
812 "deployment.environment" => r.deployment_environment = val,
813 "host.name" => r.host_name = val,
814 "host.arch" => r.host_arch = val,
815 _ => {
816 r.extra.insert(key.to_string(), val);
817 }
818 }
819 }
820 }
821 }
822 if r.host_arch.is_empty() {
823 r.host_arch = match std::env::consts::ARCH {
824 "x86_64" => "amd64".to_string(),
825 "aarch64" => "arm64".to_string(),
826 other => other.to_string(),
827 };
828 }
829 if r.host_name.is_empty()
830 && let Ok(host) = std::env::var("HOSTNAME")
831 {
832 r.host_name = host;
833 }
834 r
835}
836
837fn config_hash(cfg: &EventsConfig) -> u64 {
838 let bytes = match serde_yaml::to_string(cfg) {
839 Ok(s) => s.into_bytes(),
840 Err(_) => return 0,
841 };
842 let h = blake3::hash(&bytes);
843 let arr: [u8; 8] = match <[u8; 8]>::try_from(&h.as_bytes()[..8]) {
844 Ok(a) => a,
845 Err(_) => return 0,
846 };
847 u64::from_le_bytes(arr)
848}
849
850fn spawn_pool(
851 router: &SinkRouter,
852 registry: &Arc<SchemaRegistry>,
853 counters: &Arc<WorkerCounters>,
854 queues: &crate::config::QueuesConfig,
855) -> WorkerPool {
856 let mut pool = WorkerPool::default();
857 if let Some(sink) = router.log.as_ref().or(router.fallback.as_ref()) {
858 pool.log = spawn_tier_worker(
859 Tier::Log,
860 queues,
861 Arc::clone(sink),
862 Arc::clone(registry),
863 Arc::clone(counters),
864 );
865 }
866 if let Some(sink) = router.metric.as_ref().or(router.fallback.as_ref()) {
867 pool.metric = spawn_tier_worker(
868 Tier::Metric,
869 queues,
870 Arc::clone(sink),
871 Arc::clone(registry),
872 Arc::clone(counters),
873 );
874 }
875 if let Some(sink) = router.trace.as_ref().or(router.fallback.as_ref()) {
876 pool.trace = spawn_tier_worker(
877 Tier::Trace,
878 queues,
879 Arc::clone(sink),
880 Arc::clone(registry),
881 Arc::clone(counters),
882 );
883 }
884 if let Some(sink) = router.audit.as_ref() {
885 pool.audit = spawn_tier_worker(
886 Tier::Audit,
887 queues,
888 Arc::clone(sink),
889 Arc::clone(registry),
890 Arc::clone(counters),
891 );
892 }
893 pool
894}
895
896#[derive(Debug, thiserror::Error)]
898#[non_exhaustive]
899pub enum BuildError {
900 #[error("invalid config: {0}")]
902 InvalidConfig(#[from] crate::config::ConfigError),
903 #[error("audit spool open failed: {0}")]
905 SpoolOpen(#[source] std::io::Error),
906}
907
908#[allow(dead_code)]
909fn _ensure_noop_compiles() {
910 let _: Arc<dyn Sink> = Arc::new(NoopSink);
911}
912
913#[cfg(test)]
914mod tests {
915 use super::*;
916 use crate::resource::ResourceAttrs;
917
918 #[test]
919 fn test_oversized_label_value_drops_envelope() {
920 use obs_proto::obs::v1::{
924 ObsEnvelope, SamplingReason as PSamplingReason, Severity as PSev, Tier as PTier,
925 };
926 let observer = StandardObserver::builder()
927 .service("test", "1.0.0")
928 .sink_fallback(Arc::new(NoopSink))
929 .spawn_workers(false)
930 .build()
931 .expect("build");
932 let mut env = ObsEnvelope {
934 full_name: "test.v1.ObsBig".to_string(),
935 tier: ::buffa::EnumValue::Known(PTier::TIER_LOG),
936 sev: ::buffa::EnumValue::Known(PSev::SEVERITY_INFO),
937 sampling_reason: ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_HEAD_RATE),
938 ..Default::default()
939 };
940 env.labels.insert("ua".to_string(), "x".repeat(2048));
941 let kept = observer.run_emit_pipeline(&mut env, obs_proto::obs::v1::Severity::Info);
942 assert!(!kept, "envelope with oversize label value must be dropped");
943 }
944
945 #[test]
946 fn test_set_resource_attrs_is_visible_to_observer_callers() {
947 let observer = StandardObserver::builder()
950 .service("test", "1.0.0")
951 .sink_fallback(Arc::new(NoopSink))
952 .spawn_workers(false)
953 .build()
954 .expect("build");
955 let before = observer.resource_attrs();
956 assert_eq!(before.service_name, "test");
957 observer.set_resource_attrs(ResourceAttrs {
958 service_name: "rotated".to_string(),
959 deployment_environment: "prod".to_string(),
960 ..Default::default()
961 });
962 let after = observer.resource_attrs();
963 assert_eq!(after.service_name, "rotated");
964 assert_eq!(after.deployment_environment, "prod");
965 }
966}