use std::sync::Arc;
use bytes::BytesMut;
use obs_core::{
Cardinality, Classification, Emit, EventSchema, FieldMeta, FieldRole, InMemorySink,
SamplingReason, ScopeFrameBuilder, Severity, StandardObserver, Tier, with_observer_task,
};
#[derive(Debug, Default)]
struct ObsAsyncDebug {
request_id: String,
}
impl EventSchema for ObsAsyncDebug {
const FULL_NAME: &'static str = "test.v1.ObsAsyncDebug";
const TIER: Tier = Tier::Log;
const DEFAULT_SEV: Severity = Severity::Debug;
const FIELDS: &'static [FieldMeta] = &[FieldMeta::new(
"request_id",
1,
FieldRole::Label,
Cardinality::Low,
Classification::Internal,
)];
const SCHEMA_HASH: u64 = 0xA551_C0DE_0000_0001;
fn encode_payload(&self, _buf: &mut BytesMut) {}
fn project(&self, env: &mut obs_core::ObsEnvelope) {
env.labels
.insert("request_id".to_string(), self.request_id.clone());
}
}
#[derive(Debug, Default)]
struct ObsAsyncError;
impl EventSchema for ObsAsyncError {
const FULL_NAME: &'static str = "test.v1.ObsAsyncError";
const TIER: Tier = Tier::Log;
const DEFAULT_SEV: Severity = Severity::Error;
const FIELDS: &'static [FieldMeta] = &[];
const SCHEMA_HASH: u64 = 0xA551_C0DE_0000_0002;
fn encode_payload(&self, _buf: &mut BytesMut) {}
fn project(&self, _env: &mut obs_core::ObsEnvelope) {}
}
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn instrumented_scope_should_preserve_tail_buffer_across_awaits()
-> Result<(), Box<dyn std::error::Error>> {
use obs_core::Instrument as _;
let sink = InMemorySink::new();
let handle = sink.handle();
let observer = StandardObserver::builder()
.service("async-scope-test", "0")
.filter("trace")
.spawn_workers(false)
.sink_fallback(Arc::new(sink))
.build()?;
let observer = Arc::new(observer);
let frame = ScopeFrameBuilder::new()
.label("request_id", "req-1")
.into_frame();
with_observer_task(observer, async move {
async move {
ObsAsyncDebug {
request_id: "req-1".to_string(),
}
.emit();
tokio::task::yield_now().await;
ObsAsyncError.emit();
}
.instrument(frame)
.await;
})
.await;
let emitted = handle.drain();
let tail_replays = emitted
.iter()
.filter(|env| {
env.full_name == ObsAsyncDebug::FULL_NAME
&& matches!(
env.sampling_reason,
buffa::EnumValue::Known(SamplingReason::TailError)
)
})
.count();
assert_eq!(
tail_replays, 1,
"debug event emitted before await must be replayed by tail-on-error once; got {emitted:#?}"
);
Ok(())
}