use std::sync::{
Arc,
atomic::{AtomicU32, Ordering},
};
use arc_swap::ArcSwap;
use bytes;
use obs_proto::obs::v1::{ObsEnvelope, SamplingReason as PSamplingReason, Tier};
use parking_lot::Mutex;
use super::{
Observer,
workers::{TierWorker, WorkerCounters, note_channel_full, spawn_tier_worker},
};
use crate::{
audit_spool::SpoolWriter,
callsite::ObsCallsite,
config::{AuditFailureMode, EventsConfig},
filter::Filter,
registry::{ObsCallsiteRegistry, SchemaRegistry, ScrubbedEnvelope},
resource::ResourceAttrs,
sampling::{SamplingDecision, decide as sample_decide},
scope::{auto_fill_envelope, inbound_traceparent_sampled, push_tail_buffer},
sink::{NoopSink, Sink, SinkFut, StdoutSink},
};
#[derive(Default)]
struct SinkRouter {
log: Option<Arc<dyn Sink>>,
metric: Option<Arc<dyn Sink>>,
trace: Option<Arc<dyn Sink>>,
audit: Option<Arc<dyn Sink>>,
fallback: Option<Arc<dyn Sink>>,
}
impl std::fmt::Debug for SinkRouter {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("SinkRouter")
.field("log", &self.log.as_ref().map(|_| "..."))
.field("metric", &self.metric.as_ref().map(|_| "..."))
.field("trace", &self.trace.as_ref().map(|_| "..."))
.field("audit", &self.audit.as_ref().map(|_| "..."))
.field("fallback", &self.fallback.as_ref().map(|_| "..."))
.finish()
}
}
impl SinkRouter {
fn for_tier(&self, tier: Tier) -> Option<&Arc<dyn Sink>> {
let primary = match tier {
Tier::Log => self.log.as_ref(),
Tier::Metric => self.metric.as_ref(),
Tier::Trace => self.trace.as_ref(),
Tier::Audit => self.audit.as_ref(),
_ => None,
};
primary.or(self.fallback.as_ref())
}
}
#[derive(Debug, Default)]
struct WorkerPool {
log: Option<TierWorker>,
metric: Option<TierWorker>,
trace: Option<TierWorker>,
audit: Option<TierWorker>,
}
pub struct StandardObserver {
router: SinkRouter,
workers: WorkerPool,
spool: Option<Arc<SpoolWriter>>,
registry: Arc<SchemaRegistry>,
callsites: Arc<ObsCallsiteRegistry>,
config: ArcSwap<EventsConfig>,
filter: ArcSwap<Filter>,
resource: ArcSwap<ResourceAttrs>,
counters: Arc<WorkerCounters>,
generation: AtomicU32,
service: String,
instance: String,
version: String,
sync_dispatch_lock: Mutex<()>,
}
impl std::fmt::Debug for StandardObserver {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StandardObserver")
.field("schemas", &self.registry.len())
.field("service", &self.service)
.field("instance", &self.instance)
.field("version", &self.version)
.field("generation", &self.generation.load(Ordering::Relaxed))
.finish_non_exhaustive()
}
}
impl StandardObserver {
#[must_use]
pub fn builder() -> StandardObserverBuilder {
StandardObserverBuilder::default()
}
pub fn dev() -> Result<Self, BuildError> {
Self::builder()
.service("dev", env!("CARGO_PKG_VERSION"))
.sink_fallback(Arc::new(StdoutSink::default()))
.build()
}
#[must_use]
pub fn registry(&self) -> Arc<SchemaRegistry> {
Arc::clone(&self.registry)
}
#[must_use]
pub fn callsites(&self) -> Arc<ObsCallsiteRegistry> {
Arc::clone(&self.callsites)
}
#[must_use]
pub fn config(&self) -> arc_swap::Guard<Arc<EventsConfig>> {
self.config.load()
}
pub fn set_resource_attrs(&self, attrs: ResourceAttrs) {
self.resource.store(Arc::new(attrs));
}
pub fn reload_config(&self, new_config: EventsConfig) -> Result<(), BuildError> {
if let Err(e) = new_config.validate() {
crate::self_events::emit_config_reload_failed(&format!("validate: {e}"));
return Err(BuildError::InvalidConfig(e));
}
if let Some(spec) = new_config.filter.as_deref() {
match Filter::parse(spec) {
Ok(parsed) => self.filter.store(Arc::new(parsed)),
Err(e) => {
crate::self_events::emit_config_reload_failed(&format!("filter: {e}"));
return Err(BuildError::InvalidConfig(
crate::config::ConfigError::invalid_range("filter", format!("{e}")),
));
}
}
} else {
self.filter.store(Arc::new(Filter::new()));
}
let cfg_hash = config_hash(&new_config);
self.config.store(Arc::new(new_config));
self.generation.fetch_add(1, Ordering::Release);
crate::self_events::emit_config_reloaded(cfg_hash);
Ok(())
}
#[must_use]
pub fn filter(&self) -> Arc<Filter> {
self.filter.load_full()
}
#[must_use]
pub fn counters(&self) -> Arc<WorkerCounters> {
Arc::clone(&self.counters)
}
fn fill_identity(&self, env: &mut ObsEnvelope) {
if env.service.is_empty() {
env.service.clone_from(&self.service);
}
if env.instance.is_empty() {
env.instance.clone_from(&self.instance);
}
if env.version.is_empty() {
env.version.clone_from(&self.version);
}
}
fn dispatch_sync(&self, env: ObsEnvelope, tier: Tier) {
let _g = self.sync_dispatch_lock.lock();
let Some(sink) = self.router.for_tier(tier) else {
return;
};
let mut scratch = bytes::BytesMut::with_capacity(env.payload.len());
let scrubbed = match ScrubbedEnvelope::scrub(&env, &self.registry, &mut scratch) {
Ok(s) => s,
Err(_) => return,
};
sink.deliver(scrubbed);
}
fn dispatch_async(&self, env: ObsEnvelope, tier: Tier) {
let worker = match tier {
Tier::Log => self.workers.log.as_ref(),
Tier::Metric => self.workers.metric.as_ref(),
Tier::Trace => self.workers.trace.as_ref(),
Tier::Audit => self.workers.audit.as_ref(),
_ => None,
};
let Some(worker) = worker else {
self.dispatch_sync(env, tier);
return;
};
if tier == Tier::Audit {
self.dispatch_audit(worker, env);
} else {
match worker.try_send(env) {
Ok(()) => {}
Err(_dropped) => {
note_channel_full(&self.counters, tier);
}
}
}
}
fn dispatch_audit(&self, worker: &TierWorker, env: ObsEnvelope) {
let cfg = self.config.load();
let block_ms = u64::from(cfg.audit.block_ms_max);
let mut env_unsent = match worker.try_send(env) {
Ok(()) => return,
Err(env) => env,
};
let started = std::time::Instant::now();
let interval = std::time::Duration::from_millis(2);
while started.elapsed().as_millis() < u128::from(block_ms) {
match worker.try_send(env_unsent) {
Ok(()) => return,
Err(env) => env_unsent = env,
}
std::thread::sleep(interval);
}
if let Some(spool) = self.spool.as_ref() {
match spool.append(&env_unsent) {
Ok(()) => {
note_channel_full(&self.counters, Tier::Audit);
crate::self_events::emit_audit_spooled(env_unsent.full_name.as_str());
}
Err(e) => {
crate::self_events::emit_audit_spool_failed(&e.to_string());
self.handle_spool_failure();
}
}
} else {
crate::self_events::emit_audit_spool_failed("no spool configured");
self.handle_spool_failure();
}
}
fn handle_spool_failure(&self) {
#[allow(clippy::panic)]
{
let cfg = self.config.load();
match cfg.audit.on_failure {
AuditFailureMode::Panic => {
panic!("audit spool unwritable; compliance failure")
}
AuditFailureMode::Abort => std::process::abort(),
AuditFailureMode::WarnOnly => {
eprintln!("[obs] AUDIT spool unwritable; envelope dropped (warn_only)");
}
}
}
}
fn recover_audit_spool(&self) {
let cfg = self.config.load();
let dir = cfg.audit.spool_dir.clone();
if !dir.exists() {
return;
}
let mut total: u64 = 0;
let report = crate::audit_spool::recover(&dir, |env| {
total += 1;
if let Some(worker) = self.workers.audit.as_ref() {
let _ = worker.try_send(env);
} else {
self.dispatch_sync(env, Tier::Audit);
}
Ok(())
});
if total == 0 {
let _ = report;
return;
}
let mut env = ObsEnvelope {
full_name: "obs.runtime.v1.ObsAuditSpoolRecovered".to_string(),
tier: ::buffa::EnumValue::Known(obs_proto::obs::v1::Tier::TIER_LOG),
sev: ::buffa::EnumValue::Known(obs_proto::obs::v1::Severity::SEVERITY_INFO),
..Default::default()
};
env.labels
.insert("record_count".to_string(), total.to_string());
self.fill_identity(&mut env);
self.dispatch_sync(env, Tier::Log);
}
fn run_emit_pipeline(&self, env: &mut ObsEnvelope, sev: obs_proto::obs::v1::Severity) -> bool {
auto_fill_envelope(env);
let cfg_pre = self.config.load();
let max_bytes = u64::from(cfg_pre.limits.max_payload_bytes);
let payload_size = env.payload.len() as u64;
if max_bytes > 0 && payload_size > max_bytes {
crate::self_events::emit_oversized_dropped(env.full_name.as_str(), payload_size);
return false;
}
let max_label_bytes = u64::from(cfg_pre.limits.max_label_value_bytes);
if max_label_bytes > 0 {
for (k, v) in &env.labels {
if v.len() as u64 > max_label_bytes {
crate::self_events::emit_oversized_label_dropped(
env.full_name.as_str(),
k,
v.len() as u64,
);
return false;
}
}
}
let filter = self.filter.load();
if !filter.event_allowed(env, sev) {
return false;
}
let bypass_sampler = matches!(
env.sampling_reason,
::buffa::EnumValue::Known(
PSamplingReason::SAMPLING_REASON_FORENSIC
| PSamplingReason::SAMPLING_REASON_AUDIT
| PSamplingReason::SAMPLING_REASON_OVERRIDE,
)
);
if bypass_sampler {
return true;
}
let cfg = self.config.load();
let inbound = inbound_traceparent_sampled();
match sample_decide(&cfg.sampling, env.full_name.as_str(), sev, inbound) {
SamplingDecision::Drop => {
return false;
}
SamplingDecision::Keep => {}
SamplingDecision::ParentSet { sampled: true } => {
env.sampling_reason =
::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_OVERRIDE);
}
SamplingDecision::ParentSet { sampled: false } => {
return false;
}
}
if matches!(
sev,
obs_proto::obs::v1::Severity::Trace | obs_proto::obs::v1::Severity::Debug
) {
push_tail_buffer(env);
} else if sev >= obs_proto::obs::v1::Severity::Error {
crate::scope::mark_error_on_active_scopes();
}
true
}
}
impl Observer for StandardObserver {
fn emit_envelope(&self, mut env: ObsEnvelope) {
self.fill_identity(&mut env);
let sev = match env.sev {
::buffa::EnumValue::Known(s) => s,
::buffa::EnumValue::Unknown(_) => obs_proto::obs::v1::Severity::Unspecified,
};
if !self.run_emit_pipeline(&mut env, sev) {
return;
}
let tier = match env.tier {
::buffa::EnumValue::Known(t) => t,
::buffa::EnumValue::Unknown(_) => Tier::Unspecified,
};
if let Ok(_h) = tokio::runtime::Handle::try_current() {
self.dispatch_async(env, tier);
} else {
self.dispatch_sync(env, tier);
}
}
fn enabled(&self, callsite: &ObsCallsite) -> bool {
let filter = self.filter.load();
filter.callsite_interest(callsite) != crate::callsite::Interest::Never
}
fn generation(&self) -> u32 {
self.generation.load(Ordering::Acquire)
}
fn reload_filter(&self) {
self.generation.fetch_add(1, Ordering::Release);
}
fn flush(&self) -> SinkFut<'_> {
Box::pin(async move {
for w in [
self.workers.log.as_ref(),
self.workers.metric.as_ref(),
self.workers.trace.as_ref(),
self.workers.audit.as_ref(),
]
.iter()
.flatten()
{
w.flush().await;
}
})
}
fn shutdown(&self) -> SinkFut<'_> {
Box::pin(async move {
for w in [
self.workers.log.as_ref(),
self.workers.metric.as_ref(),
self.workers.trace.as_ref(),
self.workers.audit.as_ref(),
]
.iter()
.flatten()
{
w.shutdown().await;
}
if let Some(spool) = self.spool.as_ref() {
spool.close();
}
})
}
fn shutdown_blocking(&self, timeout: std::time::Duration) {
match tokio::runtime::Handle::try_current() {
Err(_) => {
if let Ok(rt) = tokio::runtime::Builder::new_current_thread()
.enable_all()
.build()
{
let _ = rt.block_on(tokio::time::timeout(timeout, self.shutdown()));
}
}
Ok(handle) => {
if matches!(
handle.runtime_flavor(),
tokio::runtime::RuntimeFlavor::MultiThread
) {
tokio::task::block_in_place(|| {
let _ = handle.block_on(tokio::time::timeout(timeout, self.shutdown()));
});
} else {
eprintln!(
"obs: shutdown_blocking called from a current-thread tokio runtime; use \
`Observer::shutdown().await` instead"
);
}
}
}
}
fn callsites(&self) -> Option<Arc<ObsCallsiteRegistry>> {
Some(Arc::clone(&self.callsites))
}
fn schema_registry(&self) -> Option<Arc<SchemaRegistry>> {
Some(Arc::clone(&self.registry))
}
fn resource_attrs(&self) -> Arc<ResourceAttrs> {
self.resource.load_full()
}
}
pub struct StandardObserverBuilder {
router: SinkRouter,
registry: Option<Arc<SchemaRegistry>>,
config: Option<EventsConfig>,
filter_spec: Option<String>,
service: Option<String>,
instance: Option<String>,
version: Option<String>,
spawn_workers: bool,
}
impl Default for StandardObserverBuilder {
fn default() -> Self {
Self {
router: SinkRouter::default(),
registry: None,
config: None,
filter_spec: None,
service: None,
instance: None,
version: None,
spawn_workers: true,
}
}
}
impl std::fmt::Debug for StandardObserverBuilder {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("StandardObserverBuilder")
.field("service", &self.service)
.field("version", &self.version)
.field("spawn_workers", &self.spawn_workers)
.finish_non_exhaustive()
}
}
impl StandardObserverBuilder {
#[must_use]
pub fn service(mut self, name: impl Into<String>, version: impl Into<String>) -> Self {
self.service = Some(name.into());
self.version = Some(version.into());
self
}
#[must_use]
pub fn instance(mut self, instance: impl Into<String>) -> Self {
self.instance = Some(instance.into());
self
}
#[must_use]
pub fn sink_for(mut self, tier: Tier, sink: Arc<dyn Sink>) -> Self {
match tier {
Tier::Log => self.router.log = Some(sink),
Tier::Metric => self.router.metric = Some(sink),
Tier::Trace => self.router.trace = Some(sink),
Tier::Audit => self.router.audit = Some(sink),
_ => {}
}
self
}
#[must_use]
pub fn sink_fallback(mut self, sink: Arc<dyn Sink>) -> Self {
self.router.fallback = Some(sink);
self
}
#[must_use]
pub fn config(mut self, cfg: EventsConfig) -> Self {
self.config = Some(cfg);
self
}
#[must_use]
pub fn filter(mut self, spec: impl Into<String>) -> Self {
self.filter_spec = Some(spec.into());
self
}
#[must_use]
pub fn registry(mut self, registry: Arc<SchemaRegistry>) -> Self {
self.registry = Some(registry);
self
}
#[must_use]
pub fn spawn_workers(mut self, yes: bool) -> Self {
self.spawn_workers = yes;
self
}
pub fn build(self) -> Result<StandardObserver, BuildError> {
let mut cfg = self.config.unwrap_or_default();
if !cfg.dev_mode
&& let Ok(v) = std::env::var("OBS_DEV")
{
let on = matches!(
v.trim().to_ascii_lowercase().as_str(),
"1" | "true" | "yes" | "on"
);
cfg.dev_mode = on;
}
cfg.validate().map_err(BuildError::InvalidConfig)?;
let filter_spec = self
.filter_spec
.or_else(|| cfg.filter.clone())
.or_else(|| std::env::var("OBS_FILTER").ok());
let filter = match filter_spec.as_deref() {
Some(spec) => Filter::parse(spec).map_err(|e| {
BuildError::InvalidConfig(crate::config::ConfigError::invalid_range(
"filter",
format!("{e}"),
))
})?,
None => Filter::new(),
};
let registry = self
.registry
.unwrap_or_else(|| Arc::new(SchemaRegistry::from_link_section()));
let service = self
.service
.or_else(|| std::env::var("OTEL_SERVICE_NAME").ok())
.unwrap_or_else(|| "obs".to_string());
let version = self
.version
.unwrap_or_else(|| env!("CARGO_PKG_VERSION").to_string());
let instance = self.instance.unwrap_or_default();
let counters = Arc::new(WorkerCounters::default());
let spool = if self.router.audit.is_some() {
Some(Arc::new(
SpoolWriter::open_with_fsync(
cfg.audit.spool_dir.clone(),
cfg.audit.spool_max_bytes,
cfg.audit.on_failure,
cfg.audit.fsync_mode,
)
.map_err(BuildError::SpoolOpen)?,
))
} else {
None
};
let workers = if self.spawn_workers {
spawn_pool(&self.router, ®istry, &counters, &cfg.queues)
} else {
WorkerPool::default()
};
let resource = build_resource_from_env(&service, &version, &instance);
let observer = StandardObserver {
router: self.router,
workers,
spool,
registry,
callsites: Arc::new(ObsCallsiteRegistry::new()),
config: ArcSwap::from_pointee(cfg),
filter: ArcSwap::from_pointee(filter),
resource: ArcSwap::from_pointee(resource),
counters,
generation: AtomicU32::new(1),
service,
instance,
version,
sync_dispatch_lock: Mutex::new(()),
};
observer.recover_audit_spool();
let schema_count = observer.registry.len() as u64;
crate::self_events::emit_registry_initialized(schema_count, 0);
Ok(observer)
}
}
fn build_resource_from_env(service: &str, version: &str, instance: &str) -> ResourceAttrs {
let mut r = ResourceAttrs {
service_name: service.to_string(),
service_version: version.to_string(),
service_instance_id: instance.to_string(),
..Default::default()
};
if let Ok(name) = std::env::var("OTEL_SERVICE_NAME")
&& !name.is_empty()
{
r.service_name = name;
}
if let Ok(extras) = std::env::var("OTEL_RESOURCE_ATTRIBUTES") {
for pair in extras.split(',') {
let pair = pair.trim();
if pair.is_empty() {
continue;
}
if let Some((k, v)) = pair.split_once('=') {
let key = k.trim();
let val = v.trim().to_string();
match key {
"service.name" => r.service_name = val,
"service.version" => r.service_version = val,
"service.namespace" => r.service_namespace = val,
"service.instance.id" => r.service_instance_id = val,
"deployment.environment" => r.deployment_environment = val,
"host.name" => r.host_name = val,
"host.arch" => r.host_arch = val,
_ => {
r.extra.insert(key.to_string(), val);
}
}
}
}
}
if r.host_arch.is_empty() {
r.host_arch = match std::env::consts::ARCH {
"x86_64" => "amd64".to_string(),
"aarch64" => "arm64".to_string(),
other => other.to_string(),
};
}
if r.host_name.is_empty()
&& let Ok(host) = std::env::var("HOSTNAME")
{
r.host_name = host;
}
r
}
fn config_hash(cfg: &EventsConfig) -> u64 {
let bytes = match serde_yaml::to_string(cfg) {
Ok(s) => s.into_bytes(),
Err(_) => return 0,
};
let h = blake3::hash(&bytes);
let arr: [u8; 8] = match <[u8; 8]>::try_from(&h.as_bytes()[..8]) {
Ok(a) => a,
Err(_) => return 0,
};
u64::from_le_bytes(arr)
}
fn spawn_pool(
router: &SinkRouter,
registry: &Arc<SchemaRegistry>,
counters: &Arc<WorkerCounters>,
queues: &crate::config::QueuesConfig,
) -> WorkerPool {
let mut pool = WorkerPool::default();
if let Some(sink) = router.log.as_ref().or(router.fallback.as_ref()) {
pool.log = spawn_tier_worker(
Tier::Log,
queues,
Arc::clone(sink),
Arc::clone(registry),
Arc::clone(counters),
);
}
if let Some(sink) = router.metric.as_ref().or(router.fallback.as_ref()) {
pool.metric = spawn_tier_worker(
Tier::Metric,
queues,
Arc::clone(sink),
Arc::clone(registry),
Arc::clone(counters),
);
}
if let Some(sink) = router.trace.as_ref().or(router.fallback.as_ref()) {
pool.trace = spawn_tier_worker(
Tier::Trace,
queues,
Arc::clone(sink),
Arc::clone(registry),
Arc::clone(counters),
);
}
if let Some(sink) = router.audit.as_ref() {
pool.audit = spawn_tier_worker(
Tier::Audit,
queues,
Arc::clone(sink),
Arc::clone(registry),
Arc::clone(counters),
);
}
pool
}
#[derive(Debug, thiserror::Error)]
#[non_exhaustive]
pub enum BuildError {
#[error("invalid config: {0}")]
InvalidConfig(#[from] crate::config::ConfigError),
#[error("audit spool open failed: {0}")]
SpoolOpen(#[source] std::io::Error),
}
#[allow(dead_code)]
fn _ensure_noop_compiles() {
let _: Arc<dyn Sink> = Arc::new(NoopSink);
}
#[cfg(test)]
mod tests {
use super::*;
use crate::resource::ResourceAttrs;
#[test]
fn test_oversized_label_value_drops_envelope() {
use obs_proto::obs::v1::{
ObsEnvelope, SamplingReason as PSamplingReason, Severity as PSev, Tier as PTier,
};
let observer = StandardObserver::builder()
.service("test", "1.0.0")
.sink_fallback(Arc::new(NoopSink))
.spawn_workers(false)
.build()
.expect("build");
let mut env = ObsEnvelope {
full_name: "test.v1.ObsBig".to_string(),
tier: ::buffa::EnumValue::Known(PTier::TIER_LOG),
sev: ::buffa::EnumValue::Known(PSev::SEVERITY_INFO),
sampling_reason: ::buffa::EnumValue::Known(PSamplingReason::SAMPLING_REASON_HEAD_RATE),
..Default::default()
};
env.labels.insert("ua".to_string(), "x".repeat(2048));
let kept = observer.run_emit_pipeline(&mut env, obs_proto::obs::v1::Severity::Info);
assert!(!kept, "envelope with oversize label value must be dropped");
}
#[test]
fn test_set_resource_attrs_is_visible_to_observer_callers() {
let observer = StandardObserver::builder()
.service("test", "1.0.0")
.sink_fallback(Arc::new(NoopSink))
.spawn_workers(false)
.build()
.expect("build");
let before = observer.resource_attrs();
assert_eq!(before.service_name, "test");
observer.set_resource_attrs(ResourceAttrs {
service_name: "rotated".to_string(),
deployment_environment: "prod".to_string(),
..Default::default()
});
let after = observer.resource_attrs();
assert_eq!(after.service_name, "rotated");
assert_eq!(after.deployment_environment, "prod");
}
}