1use std::sync::{
6 Arc,
7 atomic::{AtomicU32, Ordering},
8};
9
10use arc_swap::ArcSwap;
11use bytes;
12use obs_proto::obs::v1::{ObsEnvelope, SamplingReason as PSamplingReason};
13use obs_types::Tier;
14use parking_lot::Mutex;
15
16use super::{
17 Observer,
18 workers::{TierWorker, WorkerCounters, note_channel_full, spawn_tier_worker},
19};
20use crate::{
21 audit_spool::SpoolWriter,
22 callsite::ObsCallsite,
23 config::{AuditFailureMode, EventsConfig},
24 filter::Filter,
25 registry::{ObsCallsiteRegistry, SchemaRegistry, ScrubbedEnvelope},
26 resource::ResourceAttrs,
27 sampling::{SamplingDecision, decide as sample_decide},
28 scope::{auto_fill_envelope, inbound_traceparent_sampled, push_tail_buffer},
29 sink::{NoopSink, Sink, SinkFut, StdoutSink},
30};
31
32#[derive(Default)]
34struct SinkRouter {
35 log: Option<Arc<dyn Sink>>,
36 metric: Option<Arc<dyn Sink>>,
37 trace: Option<Arc<dyn Sink>>,
38 audit: Option<Arc<dyn Sink>>,
39 fallback: Option<Arc<dyn Sink>>,
40}
41
42impl std::fmt::Debug for SinkRouter {
43 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
44 f.debug_struct("SinkRouter")
45 .field("log", &self.log.as_ref().map(|_| "..."))
46 .field("metric", &self.metric.as_ref().map(|_| "..."))
47 .field("trace", &self.trace.as_ref().map(|_| "..."))
48 .field("audit", &self.audit.as_ref().map(|_| "..."))
49 .field("fallback", &self.fallback.as_ref().map(|_| "..."))
50 .finish()
51 }
52}
53
54impl SinkRouter {
55 fn for_tier(&self, tier: Tier) -> Option<&Arc<dyn Sink>> {
56 let primary = match tier {
57 Tier::Log => self.log.as_ref(),
58 Tier::Metric => self.metric.as_ref(),
59 Tier::Trace => self.trace.as_ref(),
60 Tier::Audit => self.audit.as_ref(),
61 _ => None,
62 };
63 primary.or(self.fallback.as_ref())
64 }
65}
66
67#[derive(Debug, Default)]
69struct WorkerPool {
70 log: Option<TierWorker>,
71 metric: Option<TierWorker>,
72 trace: Option<TierWorker>,
73 audit: Option<TierWorker>,
74}
75
76pub struct StandardObserver {
79 router: SinkRouter,
80 workers: WorkerPool,
81 spool: Option<Arc<SpoolWriter>>,
82 registry: Arc<SchemaRegistry>,
83 callsites: Arc<ObsCallsiteRegistry>,
84 config: ArcSwap<EventsConfig>,
85 filter: ArcSwap<Filter>,
86 resource: ArcSwap<ResourceAttrs>,
89 counters: Arc<WorkerCounters>,
90 generation: AtomicU32,
91 service: String,
92 instance: String,
93 version: String,
94 sync_dispatch_lock: Mutex<()>,
97}
98
99impl std::fmt::Debug for StandardObserver {
100 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
101 f.debug_struct("StandardObserver")
102 .field("schemas", &self.registry.len())
103 .field("service", &self.service)
104 .field("instance", &self.instance)
105 .field("version", &self.version)
106 .field("generation", &self.generation.load(Ordering::Relaxed))
107 .finish_non_exhaustive()
108 }
109}
110
111impl StandardObserver {
112 #[must_use]
114 pub fn builder() -> StandardObserverBuilder {
115 StandardObserverBuilder::default()
116 }
117
118 pub fn dev() -> Result<Self, BuildError> {
125 Self::builder()
126 .service("dev", env!("CARGO_PKG_VERSION"))
127 .sink_fallback(Arc::new(StdoutSink::default()))
128 .build()
129 }
130
131 #[must_use]
133 pub fn registry(&self) -> Arc<SchemaRegistry> {
134 Arc::clone(&self.registry)
135 }
136
137 #[must_use]
141 pub fn callsites(&self) -> Arc<ObsCallsiteRegistry> {
142 Arc::clone(&self.callsites)
143 }
144
145 #[must_use]
147 pub fn config(&self) -> arc_swap::Guard<Arc<EventsConfig>> {
148 self.config.load()
149 }
150
151 pub fn set_resource_attrs(&self, attrs: ResourceAttrs) {
155 self.resource.store(Arc::new(attrs));
156 }
157
158 pub fn reload_config(&self, new_config: EventsConfig) -> Result<(), BuildError> {
166 if let Err(e) = new_config.validate() {
167 crate::self_events::emit_config_reload_failed(&format!("validate: {e}"));
168 return Err(BuildError::InvalidConfig(e));
169 }
170 if let Some(spec) = new_config.filter.as_deref() {
171 match Filter::parse(spec) {
172 Ok(parsed) => self.filter.store(Arc::new(parsed)),
173 Err(e) => {
174 crate::self_events::emit_config_reload_failed(&format!("filter: {e}"));
175 return Err(BuildError::InvalidConfig(
176 crate::config::ConfigError::invalid_range("filter", format!("{e}")),
177 ));
178 }
179 }
180 } else {
181 self.filter.store(Arc::new(Filter::new()));
182 }
183 let cfg_hash = config_hash(&new_config);
187 self.config.store(Arc::new(new_config));
188 self.generation.fetch_add(1, Ordering::Release);
189 crate::self_events::emit_config_reloaded(cfg_hash);
190 Ok(())
191 }
192
193 #[must_use]
195 pub fn filter(&self) -> Arc<Filter> {
196 self.filter.load_full()
197 }
198
199 #[must_use]
201 pub fn counters(&self) -> Arc<WorkerCounters> {
202 Arc::clone(&self.counters)
203 }
204
205 fn fill_identity(&self, env: &mut ObsEnvelope) {
206 if env.service.is_empty() {
207 env.service.clone_from(&self.service);
208 }
209 if env.instance.is_empty() {
210 env.instance.clone_from(&self.instance);
211 }
212 if env.version.is_empty() {
213 env.version.clone_from(&self.version);
214 }
215 }
216
217 fn dispatch_sync(&self, env: ObsEnvelope, tier: Tier) {
218 let _g = self.sync_dispatch_lock.lock();
223 let Some(sink) = self.router.for_tier(tier) else {
224 return;
225 };
226 let mut scratch = bytes::BytesMut::with_capacity(env.payload.len());
227 let scrubbed = match ScrubbedEnvelope::scrub(&env, &self.registry, &mut scratch) {
228 Ok(s) => s,
229 Err(_) => return,
230 };
231 sink.deliver(scrubbed);
232 }
233
234 fn dispatch_async(&self, env: ObsEnvelope, tier: Tier) {
235 let worker = match tier {
236 Tier::Log => self.workers.log.as_ref(),
237 Tier::Metric => self.workers.metric.as_ref(),
238 Tier::Trace => self.workers.trace.as_ref(),
239 Tier::Audit => self.workers.audit.as_ref(),
240 _ => None,
241 };
242 let Some(worker) = worker else {
243 self.dispatch_sync(env, tier);
245 return;
246 };
247 if tier == Tier::Audit {
248 self.dispatch_audit(worker, env);
249 } else {
250 match worker.try_send(env) {
251 Ok(()) => {}
252 Err(_dropped) => {
253 note_channel_full(&self.counters, tier);
254 }
255 }
256 }
257 }
258
259 fn dispatch_audit(&self, worker: &TierWorker, env: ObsEnvelope) {
260 let cfg = self.config.load();
261 let block_ms = u64::from(cfg.audit.block_ms_max);
262 let mut env_unsent = match worker.try_send(env) {
264 Ok(()) => return,
265 Err(env) => env,
266 };
267 let started = std::time::Instant::now();
275 let interval = std::time::Duration::from_millis(2);
276 while started.elapsed().as_millis() < u128::from(block_ms) {
277 match worker.try_send(env_unsent) {
278 Ok(()) => return,
279 Err(env) => env_unsent = env,
280 }
281 std::thread::sleep(interval);
282 }
283 if let Some(spool) = self.spool.as_ref() {
285 match spool.append(&env_unsent) {
286 Ok(()) => {
287 note_channel_full(&self.counters, Tier::Audit);
288 crate::self_events::emit_audit_spooled(env_unsent.full_name.as_str());
289 }
290 Err(e) => {
291 crate::self_events::emit_audit_spool_failed(&e.to_string());
292 self.handle_spool_failure();
293 }
294 }
295 } else {
296 crate::self_events::emit_audit_spool_failed("no spool configured");
297 self.handle_spool_failure();
298 }
299 }
300
301 fn handle_spool_failure(&self) {
302 #[allow(clippy::panic)]
308 {
309 let cfg = self.config.load();
310 match cfg.audit.on_failure {
311 AuditFailureMode::Panic => {
312 panic!("audit spool unwritable; compliance failure")
313 }
314 AuditFailureMode::Abort => std::process::abort(),
315 AuditFailureMode::WarnOnly => {
316 eprintln!("[obs] AUDIT spool unwritable; envelope dropped (warn_only)");
317 }
318 }
319 }
320 }
321
322 fn recover_audit_spool(&self) {
328 let cfg = self.config.load();
329 let dir = cfg.audit.spool_dir.clone();
330 if !dir.exists() {
331 return;
332 }
333 let mut total: u64 = 0;
334 let report = crate::audit_spool::recover(&dir, |env| {
335 total += 1;
336 if let Some(worker) = self.workers.audit.as_ref() {
338 let _ = worker.try_send(env);
339 } else {
340 self.dispatch_sync(env, Tier::Audit);
341 }
342 Ok(())
343 });
344 if total == 0 {
345 let _ = report;
346 return;
347 }
348 let mut env = ObsEnvelope {
349 full_name: "obs.runtime.v1.ObsAuditSpoolRecovered".to_string(),
350 tier: ::buffa::EnumValue::Known(obs_proto::obs::v1::Tier::TIER_LOG),
351 sev: ::buffa::EnumValue::Known(obs_proto::obs::v1::Severity::SEVERITY_INFO),
352 ..Default::default()
353 };
354 env.labels
355 .insert("record_count".to_string(), total.to_string());
356 self.fill_identity(&mut env);
359 self.dispatch_sync(env, Tier::Log);
360 }
361
362 fn run_emit_pipeline(&self, env: &mut ObsEnvelope, sev: obs_types::Severity) -> bool {
367 auto_fill_envelope(env);
369 let cfg_pre = self.config.load();
374 let max_bytes = u64::from(cfg_pre.limits.max_payload_bytes);
375 let payload_size = env.payload.len() as u64;
376 if max_bytes > 0 && payload_size > max_bytes {
377 crate::self_events::emit_oversized_dropped(env.full_name.as_str(), payload_size);
378 return false;
379 }
380 let max_label_bytes = u64::from(cfg_pre.limits.max_label_value_bytes);
388 if max_label_bytes > 0 {
389 for (k, v) in &env.labels {
390 if v.len() as u64 > max_label_bytes {
391 crate::self_events::emit_oversized_label_dropped(
392 env.full_name.as_str(),
393 k,
394 v.len() as u64,
395 );
396 return false;
397 }
398 }
399 }
400 let filter = self.filter.load();
405 if !filter.event_allowed(env, sev) {
406 return false;
407 }
408 let bypass_sampler = matches!(
411 env.sampling_reason,
412 ::buffa::EnumValue::Known(
413 PSamplingReason::SAMPLING_REASON_FORENSIC
414 | PSamplingReason::SAMPLING_REASON_AUDIT
415 | PSamplingReason::SAMPLING_REASON_OVERRIDE,
416 )
417 );
418 if bypass_sampler {
419 return true;
420 }
421 let cfg = self.config.load();
422 let inbound = inbound_traceparent_sampled();
423 match sample_decide(&cfg.sampling, env.full_name.as_str(), sev, inbound) {
424 SamplingDecision::Drop => {
425 return false;
426 }
427 SamplingDecision::Keep => {}
428 SamplingDecision::ParentSet { sampled: true } => {
429 env.sampling_reason =
430 ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_OVERRIDE);
431 }
432 SamplingDecision::ParentSet { sampled: false } => {
433 return false;
434 }
435 }
436 if matches!(sev, obs_types::Severity::Trace | obs_types::Severity::Debug) {
438 push_tail_buffer(env);
439 } else if sev >= obs_types::Severity::Error {
440 crate::scope::mark_error_on_active_scopes();
441 }
442 true
443 }
444}
445
446impl Observer for StandardObserver {
447 fn emit_envelope(&self, mut env: ObsEnvelope) {
448 self.fill_identity(&mut env);
449 let sev = match env.sev {
450 ::buffa::EnumValue::Known(s) => proto_sev_to_native(s),
451 ::buffa::EnumValue::Unknown(_) => obs_types::Severity::Unspecified,
452 };
453 if !self.run_emit_pipeline(&mut env, sev) {
454 return;
455 }
456 let tier = match env.tier {
457 ::buffa::EnumValue::Known(t) => proto_tier_to_native(t),
458 ::buffa::EnumValue::Unknown(_) => Tier::Unspecified,
459 };
460 if let Ok(_h) = tokio::runtime::Handle::try_current() {
461 self.dispatch_async(env, tier);
462 } else {
463 self.dispatch_sync(env, tier);
464 }
465 }
466
467 fn enabled(&self, callsite: &ObsCallsite) -> bool {
468 let filter = self.filter.load();
475 filter.callsite_interest(callsite) != crate::callsite::Interest::Never
476 }
477
478 fn generation(&self) -> u32 {
479 self.generation.load(Ordering::Acquire)
480 }
481
482 fn reload_filter(&self) {
483 self.generation.fetch_add(1, Ordering::Release);
484 }
485
486 fn flush(&self) -> SinkFut<'_> {
487 Box::pin(async move {
488 for w in [
489 self.workers.log.as_ref(),
490 self.workers.metric.as_ref(),
491 self.workers.trace.as_ref(),
492 self.workers.audit.as_ref(),
493 ]
494 .iter()
495 .flatten()
496 {
497 w.flush().await;
498 }
499 })
500 }
501
502 fn shutdown(&self) -> SinkFut<'_> {
503 Box::pin(async move {
504 for w in [
505 self.workers.log.as_ref(),
506 self.workers.metric.as_ref(),
507 self.workers.trace.as_ref(),
508 self.workers.audit.as_ref(),
509 ]
510 .iter()
511 .flatten()
512 {
513 w.shutdown().await;
514 }
515 if let Some(spool) = self.spool.as_ref() {
516 spool.close();
517 }
518 })
519 }
520
521 fn shutdown_blocking(&self, timeout: std::time::Duration) {
522 match tokio::runtime::Handle::try_current() {
532 Err(_) => {
533 if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
534 .enable_all()
535 .build()
536 {
537 let _ = rt.block_on(tokio::time::timeout(timeout, self.shutdown()));
538 }
539 }
540 Ok(handle) => {
541 if matches!(
542 handle.runtime_flavor(),
543 tokio::runtime::RuntimeFlavor::MultiThread
544 ) {
545 tokio::task::block_in_place(|| {
546 let _ = handle.block_on(tokio::time::timeout(timeout, self.shutdown()));
547 });
548 } else {
549 eprintln!(
554 "obs: shutdown_blocking called from a current-thread tokio runtime; use \
555 `Observer::shutdown().await` instead"
556 );
557 }
558 }
559 }
560 }
561
562 fn callsites(&self) -> Option<Arc<ObsCallsiteRegistry>> {
563 Some(Arc::clone(&self.callsites))
564 }
565
566 fn schema_registry(&self) -> Option<Arc<SchemaRegistry>> {
567 Some(Arc::clone(&self.registry))
568 }
569
570 fn resource_attrs(&self) -> Arc<ResourceAttrs> {
571 self.resource.load_full()
572 }
573}
574
575#[allow(non_snake_case, non_upper_case_globals)]
576fn proto_tier_to_native(t: obs_proto::obs::v1::Tier) -> Tier {
577 use obs_proto::obs::v1::Tier as P;
578 match t {
579 P::TIER_UNSPECIFIED => Tier::Unspecified,
580 P::TIER_LOG => Tier::Log,
581 P::TIER_METRIC => Tier::Metric,
582 P::TIER_TRACE => Tier::Trace,
583 P::TIER_AUDIT => Tier::Audit,
584 }
585}
586
587#[allow(non_snake_case, non_upper_case_globals)]
588fn proto_sev_to_native(s: obs_proto::obs::v1::Severity) -> obs_types::Severity {
589 use obs_proto::obs::v1::Severity as P;
590 match s {
591 P::SEVERITY_UNSPECIFIED => obs_types::Severity::Unspecified,
592 P::SEVERITY_TRACE => obs_types::Severity::Trace,
593 P::SEVERITY_DEBUG => obs_types::Severity::Debug,
594 P::SEVERITY_INFO => obs_types::Severity::Info,
595 P::SEVERITY_WARN => obs_types::Severity::Warn,
596 P::SEVERITY_ERROR => obs_types::Severity::Error,
597 P::SEVERITY_FATAL => obs_types::Severity::Fatal,
598 }
599}
600
601pub struct StandardObserverBuilder {
603 router: SinkRouter,
604 registry: Option<Arc<SchemaRegistry>>,
605 config: Option<EventsConfig>,
606 filter_spec: Option<String>,
607 service: Option<String>,
608 instance: Option<String>,
609 version: Option<String>,
610 spawn_workers: bool,
611}
612
613impl Default for StandardObserverBuilder {
614 fn default() -> Self {
615 Self {
616 router: SinkRouter::default(),
617 registry: None,
618 config: None,
619 filter_spec: None,
620 service: None,
621 instance: None,
622 version: None,
623 spawn_workers: true,
624 }
625 }
626}
627
628impl std::fmt::Debug for StandardObserverBuilder {
629 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
630 f.debug_struct("StandardObserverBuilder")
631 .field("service", &self.service)
632 .field("version", &self.version)
633 .field("spawn_workers", &self.spawn_workers)
634 .finish_non_exhaustive()
635 }
636}
637
638impl StandardObserverBuilder {
639 #[must_use]
641 pub fn service(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
642 self.service = Some(name.into());
643 self.version = Some(version.into());
644 self
645 }
646
647 #[must_use]
649 pub fn instance(mut self, instance: impl Into<String>) -> Self {
650 self.instance = Some(instance.into());
651 self
652 }
653
654 #[must_use]
657 pub fn sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
658 match tier {
659 Tier::Log => self.router.log = Some(sink),
660 Tier::Metric => self.router.metric = Some(sink),
661 Tier::Trace => self.router.trace = Some(sink),
662 Tier::Audit => self.router.audit = Some(sink),
663 _ => {}
664 }
665 self
666 }
667
668 #[must_use]
670 pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
671 self.router.fallback = Some(sink);
672 self
673 }
674
675 #[must_use]
677 pub fn config(mut self, cfg: EventsConfig) -> Self {
678 self.config = Some(cfg);
679 self
680 }
681
682 #[must_use]
684 pub fn filter(mut self, spec: impl Into<String>) -> Self {
685 self.filter_spec = Some(spec.into());
686 self
687 }
688
689 #[must_use]
691 pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
692 self.registry = Some(registry);
693 self
694 }
695
696 #[must_use]
700 pub fn spawn_workers(mut self, yes: bool) -> Self {
701 self.spawn_workers = yes;
702 self
703 }
704
705 pub fn build(self) -> Result<StandardObserver, BuildError> {
712 let mut cfg = self.config.unwrap_or_default();
713 if !cfg.dev_mode
717 && let Ok(v) = std::env::var("OBS_DEV")
718 {
719 let on = matches!(
720 v.trim().to_ascii_lowercase().as_str(),
721 "1" | "true" | "yes" | "on"
722 );
723 cfg.dev_mode = on;
724 }
725 cfg.validate().map_err(BuildError::InvalidConfig)?;
726
727 let filter_spec = self
728 .filter_spec
729 .or_else(|| cfg.filter.clone())
730 .or_else(|| std::env::var("OBS_FILTER").ok());
731 let filter = match filter_spec.as_deref() {
732 Some(spec) => Filter::parse(spec).map_err(|e| {
733 BuildError::InvalidConfig(crate::config::ConfigError::invalid_range(
734 "filter",
735 format!("{e}"),
736 ))
737 })?,
738 None => Filter::new(),
739 };
740
741 let registry = self
742 .registry
743 .unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
744
745 let service = self
747 .service
748 .or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
749 .unwrap_or_else(|| "obs".to_string());
750 let version = self
751 .version
752 .unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
753 let instance = self.instance.unwrap_or_default();
754
755 let counters = Arc::new(WorkerCounters::default());
756 let spool = if self.router.audit.is_some() {
757 Some(Arc::new(
758 SpoolWriter::open_with_fsync(
759 cfg.audit.spool_dir.clone(),
760 cfg.audit.spool_max_bytes,
761 cfg.audit.on_failure,
762 cfg.audit.fsync_mode,
763 )
764 .map_err(BuildError::SpoolOpen)?,
765 ))
766 } else {
767 None
768 };
769 let workers = if self.spawn_workers {
770 spawn_pool(&self.router, ®istry, &counters, &cfg.queues)
771 } else {
772 WorkerPool::default()
773 };
774
775 let resource = build_resource_from_env(&service, &version, &instance);
776 let observer = StandardObserver {
777 router: self.router,
778 workers,
779 spool,
780 registry,
781 callsites: Arc::new(ObsCallsiteRegistry::new()),
782 config: ArcSwap::from_pointee(cfg),
783 filter: ArcSwap::from_pointee(filter),
784 resource: ArcSwap::from_pointee(resource),
785 counters,
786 generation: AtomicU32::new(1),
787 service,
788 instance,
789 version,
790 sync_dispatch_lock: Mutex::new(()),
791 };
792 observer.recover_audit_spool();
798 let schema_count = observer.registry.len() as u64;
802 crate::self_events::emit_registry_initialized(schema_count, 0);
803 Ok(observer)
804 }
805}
806
807fn build_resource_from_env(service: &str, version: &str, instance: &str) -> ResourceAttrs {
811 let mut r = ResourceAttrs {
812 service_name: service.to_string(),
813 service_version: version.to_string(),
814 service_instance_id: instance.to_string(),
815 ..Default::default()
816 };
817 if let Ok(name) = std::env::var("OTEL_SERVICE_NAME")
818 && !name.is_empty()
819 {
820 r.service_name = name;
821 }
822 if let Ok(extras) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") {
823 for pair in extras.split(',') {
824 let pair = pair.trim();
825 if pair.is_empty() {
826 continue;
827 }
828 if let Some((k, v)) = pair.split_once('=') {
829 let key = k.trim();
830 let val = v.trim().to_string();
831 match key {
832 "service.name" => r.service_name = val,
833 "service.version" => r.service_version = val,
834 "service.namespace" => r.service_namespace = val,
835 "service.instance.id" => r.service_instance_id = val,
836 "deployment.environment" => r.deployment_environment = val,
837 "host.name" => r.host_name = val,
838 "host.arch" => r.host_arch = val,
839 _ => {
840 r.extra.insert(key.to_string(), val);
841 }
842 }
843 }
844 }
845 }
846 if r.host_arch.is_empty() {
847 r.host_arch = match std::env::consts::ARCH {
848 "x86_64" => "amd64".to_string(),
849 "aarch64" => "arm64".to_string(),
850 other => other.to_string(),
851 };
852 }
853 if r.host_name.is_empty()
854 && let Ok(host) = std::env::var("HOSTNAME")
855 {
856 r.host_name = host;
857 }
858 r
859}
860
861fn config_hash(cfg: &EventsConfig) -> u64 {
862 let bytes = match serde_yaml::to_string(cfg) {
863 Ok(s) => s.into_bytes(),
864 Err(_) => return 0,
865 };
866 let h = blake3::hash(&bytes);
867 let arr: [u8; 8] = match <[u8; 8]>::try_from(&h.as_bytes()[..8]) {
868 Ok(a) => a,
869 Err(_) => return 0,
870 };
871 u64::from_le_bytes(arr)
872}
873
874fn spawn_pool(
875 router: &SinkRouter,
876 registry: &Arc<SchemaRegistry>,
877 counters: &Arc<WorkerCounters>,
878 queues: &crate::config::QueuesConfig,
879) -> WorkerPool {
880 let mut pool = WorkerPool::default();
881 if let Some(sink) = router.log.as_ref().or(router.fallback.as_ref()) {
882 pool.log = spawn_tier_worker(
883 Tier::Log,
884 queues,
885 Arc::clone(sink),
886 Arc::clone(registry),
887 Arc::clone(counters),
888 );
889 }
890 if let Some(sink) = router.metric.as_ref().or(router.fallback.as_ref()) {
891 pool.metric = spawn_tier_worker(
892 Tier::Metric,
893 queues,
894 Arc::clone(sink),
895 Arc::clone(registry),
896 Arc::clone(counters),
897 );
898 }
899 if let Some(sink) = router.trace.as_ref().or(router.fallback.as_ref()) {
900 pool.trace = spawn_tier_worker(
901 Tier::Trace,
902 queues,
903 Arc::clone(sink),
904 Arc::clone(registry),
905 Arc::clone(counters),
906 );
907 }
908 if let Some(sink) = router.audit.as_ref() {
909 pool.audit = spawn_tier_worker(
910 Tier::Audit,
911 queues,
912 Arc::clone(sink),
913 Arc::clone(registry),
914 Arc::clone(counters),
915 );
916 }
917 pool
918}
919
920#[derive(Debug, thiserror::Error)]
922#[non_exhaustive]
923pub enum BuildError {
924 #[error("invalid config: {0}")]
926 InvalidConfig(#[from] crate::config::ConfigError),
927 #[error("audit spool open failed: {0}")]
929 SpoolOpen(#[source] std::io::Error),
930}
931
932#[allow(dead_code)]
933fn _ensure_noop_compiles() {
934 let _: Arc<dyn Sink> = Arc::new(NoopSink);
935}
936
937#[cfg(test)]
938mod tests {
939 use super::*;
940 use crate::resource::ResourceAttrs;
941
942 #[test]
943 fn test_oversized_label_value_drops_envelope() {
944 use obs_proto::obs::v1::{
948 ObsEnvelope, SamplingReason as PSamplingReason, Severity as PSev, Tier as PTier,
949 };
950 let observer = StandardObserver::builder()
951 .service("test", "1.0.0")
952 .sink_fallback(Arc::new(NoopSink))
953 .spawn_workers(false)
954 .build()
955 .expect("build");
956 let mut env = ObsEnvelope {
958 full_name: "test.v1.ObsBig".to_string(),
959 tier: ::buffa::EnumValue::Known(PTier::TIER_LOG),
960 sev: ::buffa::EnumValue::Known(PSev::SEVERITY_INFO),
961 sampling_reason: ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_HEAD_RATE),
962 ..Default::default()
963 };
964 env.labels.insert("ua".to_string(), "x".repeat(2048));
965 let kept = observer.run_emit_pipeline(&mut env, obs_types::Severity::Info);
966 assert!(!kept, "envelope with oversize label value must be dropped");
967 }
968
969 #[test]
970 fn test_set_resource_attrs_is_visible_to_observer_callers() {
971 let observer = StandardObserver::builder()
974 .service("test", "1.0.0")
975 .sink_fallback(Arc::new(NoopSink))
976 .spawn_workers(false)
977 .build()
978 .expect("build");
979 let before = observer.resource_attrs();
980 assert_eq!(before.service_name, "test");
981 observer.set_resource_attrs(ResourceAttrs {
982 service_name: "rotated".to_string(),
983 deployment_environment: "prod".to_string(),
984 ..Default::default()
985 });
986 let after = observer.resource_attrs();
987 assert_eq!(after.service_name, "rotated");
988 assert_eq!(after.deployment_environment, "prod");
989 }
990}