use kaish_kernel::Kernel;
use kaish_types::ExecuteOptions;
use opentelemetry::trace::TracerProvider as _;
use opentelemetry_sdk::trace::{InMemorySpanExporter, SdkTracerProvider};
use tracing_subscriber::prelude::*;
const TRACEPARENT: &str = "00-4bf92f3577b34da6a3ce929d0e0e4736-00f067aa0ba902b7-01";
const TRACE_ID: &str = "4bf92f3577b34da6a3ce929d0e0e4736";
const PARENT_SPAN_ID: &str = "00f067aa0ba902b7";
#[tokio::test(flavor = "multi_thread", worker_threads = 2)]
async fn embedder_trace_context_reaches_foreground_and_forked_spans() {
let exporter = InMemorySpanExporter::default();
let provider = SdkTracerProvider::builder()
.with_simple_exporter(exporter.clone())
.build();
let tracer = provider.tracer("kaish-kernel-test");
let subscriber =
tracing_subscriber::registry().with(tracing_opentelemetry::layer().with_tracer(tracer));
tracing::subscriber::set_global_default(subscriber).expect("install global subscriber once");
let kernel = Kernel::transient().expect("build transient kernel");
let opts = ExecuteOptions::new()
.with_traceparent(TRACEPARENT)
.with_baggage_entry("owner", "atobey");
let result = kernel
.execute_with_options("true", opts)
.await
.expect("execute should succeed");
assert_eq!(result.code, 0, "`true` should exit 0");
assert_eq!(
result.baggage.get("owner").map(String::as_str),
Some("atobey"),
"embedder baggage must be echoed back onto ExecResult.baggage",
);
let scatter = kernel
.execute_with_options(
r#"seq 1 3 | scatter | echo "$ITEM" | gather"#,
ExecuteOptions::new().with_traceparent(TRACEPARENT),
)
.await
.expect("scatter should succeed");
assert_eq!(scatter.code, 0, "scatter/gather should exit 0");
provider.force_flush().expect("flush spans");
let spans = exporter.get_finished_spans().expect("collect finished spans");
let exec = spans
.iter()
.find(|s| s.name.as_ref() == "execute_with_options_inner")
.expect("the kernel execution span should have been exported");
assert_eq!(
exec.span_context.trace_id().to_string(),
TRACE_ID,
"execution span must inherit the embedder's trace id",
);
assert_eq!(
exec.parent_span_id.to_string(),
PARENT_SPAN_ID,
"execution span must parent directly onto the embedder's span id",
);
let worker_spans: Vec<_> = spans
.iter()
.filter(|s| s.name.as_ref() == "scatter_worker")
.collect();
assert!(
!worker_spans.is_empty(),
"expected scatter_worker spans to be exported",
);
let local_span_ids: std::collections::HashSet<String> = spans
.iter()
.map(|s| s.span_context.span_id().to_string())
.collect();
for worker in &worker_spans {
assert_eq!(
worker.span_context.trace_id().to_string(),
TRACE_ID,
"forked scatter worker span must stay in the embedder's trace",
);
assert!(
worker.span_context.is_sampled(),
"worker span should inherit the sampled decision from the remote parent",
);
assert_ne!(
worker.parent_span_id.to_string(),
PARENT_SPAN_ID,
"forked worker must nest under a kaish span, not the remote parent",
);
assert!(
local_span_ids.contains(&worker.parent_span_id.to_string()),
"worker's parent must be one of kaish's own exported spans",
);
}
}