obs-core 0.2.1

Runtime engine for the obs SDK: Observer, Sink, schema registry, sampling, config.
Documentation
//! Async scope regression tests.

use std::sync::Arc;

use bytes::BytesMut;
use obs_core::{
    Cardinality, Classification, Emit, EventSchema, FieldMeta, FieldRole, InMemorySink,
    SamplingReason, ScopeFrameBuilder, Severity, StandardObserver, Tier, with_observer_task,
};

#[derive(Debug, Default)]
struct ObsAsyncDebug {
    request_id: String,
}

impl EventSchema for ObsAsyncDebug {
    const FULL_NAME: &'static str = "test.v1.ObsAsyncDebug";
    const TIER: Tier = Tier::Log;
    const DEFAULT_SEV: Severity = Severity::Debug;
    const FIELDS: &'static [FieldMeta] = &[FieldMeta::new(
        "request_id",
        1,
        FieldRole::Label,
        Cardinality::Low,
        Classification::Internal,
    )];
    const SCHEMA_HASH: u64 = 0xA551_C0DE_0000_0001;

    fn encode_payload(&self, _buf: &mut BytesMut) {}

    fn project(&self, env: &mut obs_core::ObsEnvelope) {
        env.labels
            .insert("request_id".to_string(), self.request_id.clone());
    }
}

#[derive(Debug, Default)]
struct ObsAsyncError;

impl EventSchema for ObsAsyncError {
    const FULL_NAME: &'static str = "test.v1.ObsAsyncError";
    const TIER: Tier = Tier::Log;
    const DEFAULT_SEV: Severity = Severity::Error;
    const FIELDS: &'static [FieldMeta] = &[];
    const SCHEMA_HASH: u64 = 0xA551_C0DE_0000_0002;

    fn encode_payload(&self, _buf: &mut BytesMut) {}

    fn project(&self, _env: &mut obs_core::ObsEnvelope) {}
}

#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn instrumented_scope_should_preserve_tail_buffer_across_awaits()
-> Result<(), Box<dyn std::error::Error>> {
    use obs_core::Instrument as _;

    let sink = InMemorySink::new();
    let handle = sink.handle();
    let observer = StandardObserver::builder()
        .service("async-scope-test", "0")
        .filter("trace")
        .spawn_workers(false)
        .sink_fallback(Arc::new(sink))
        .build()?;
    let observer = Arc::new(observer);

    let frame = ScopeFrameBuilder::new()
        .label("request_id", "req-1")
        .into_frame();

    with_observer_task(observer, async move {
        async move {
            ObsAsyncDebug {
                request_id: "req-1".to_string(),
            }
            .emit();
            tokio::task::yield_now().await;
            ObsAsyncError.emit();
        }
        .instrument(frame)
        .await;
    })
    .await;

    let emitted = handle.drain();
    let tail_replays = emitted
        .iter()
        .filter(|env| {
            env.full_name == ObsAsyncDebug::FULL_NAME
                && matches!(
                    env.sampling_reason,
                    buffa::EnumValue::Known(SamplingReason::TailError)
                )
        })
        .count();
    assert_eq!(
        tail_replays, 1,
        "debug event emitted before await must be replayed by tail-on-error once; got {emitted:#?}"
    );
    Ok(())
}