#[cfg(test)]
mod tests {
use super::*;
use opentelemetry::trace::TracerProvider;
use opentelemetry_otlp::WithExportConfig;
use std::time::{Duration, Instant};
use tracing::{info, Instrument};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
const DELTA: f64 = 1000.0 / 60.0;
#[must_use]
pub struct TracingGuard {
opentelemetry_trace_guard: opentelemetry_sdk::trace::SdkTracerProvider,
chrome_guard: tracing_chrome::FlushGuard,
}
impl Drop for TracingGuard {
fn drop(&mut self) {
if let Err(err) = self.opentelemetry_trace_guard.shutdown() {
eprintln!("{err:?}");
}
}
}
fn resource() -> opentelemetry_sdk::Resource {
opentelemetry_sdk::Resource::builder()
.with_service_name("my-test-service")
.with_schema_url(
[
opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_NAME,
env!("CARGO_PKG_NAME"),
),
opentelemetry::KeyValue::new(
opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
env!("CARGO_PKG_VERSION"),
),
// opentelemetry::KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
],
opentelemetry_semantic_conventions::SCHEMA_URL,
)
.build()
}
pub fn test_init_opentelemetry_layer() -> opentelemetry_sdk::trace::SdkTracerProvider {
let exporter = opentelemetry_otlp::SpanExporter::builder()
.with_tonic()
.with_endpoint("http://127.0.0.1:4317")
.build()
.unwrap();
opentelemetry_sdk::trace::SdkTracerProvider::builder()
.with_resource(resource())
.with_batch_exporter(exporter)
.build()
}
pub fn test_init_tracing() -> TracingGuard {
let otlp_tracer_provider = test_init_opentelemetry_layer();
let otlp_tracer = otlp_tracer_provider.tracer("my-tracer");
let (chrome_layer, _guard) = tracing_chrome::ChromeLayerBuilder::new()
.trace_style(tracing_chrome::TraceStyle::Async)
.include_args(true)
.build();
let tracy_layer = tracing_tracy::TracyLayer::default();
let forest_layer = tracing_forest::ForestLayer::default();
let registry = tracing_subscriber::registry()
// Write to console - Bad with async, console only
// .with(tracing_subscriber::fmt::layer())
// Chrome tracing - Bad with async, has UI
.with(chrome_layer)
// Tracy tracing - Bad with async, has UI
.with(tracy_layer)
// Write contextual traces to console - Good with async, console only
.with(forest_layer)
// Opentelemtry Jaeger - Good with async, has UI
.with(tracing_opentelemetry::OpenTelemetryLayer::new(otlp_tracer));
registry.init();
// tracing::subscriber::set_global_default(registry).expect("setup tracy layer");
TracingGuard {
opentelemetry_trace_guard: otlp_tracer_provider,
chrome_guard: _guard,
}
}
#[tracing::instrument(ret)]
async fn add_async(a: i32, b: i32) -> i32 {
a + b
}
#[tracing::instrument(skip_all)]
fn run_test(future: impl Future<Output = ()> + 'static) {
let executor = TickedAsyncExecutor::new(|state| {
info!("State: {state:?}");
});
executor
.spawn_local(
"MyIdentifier",
future.instrument(tracing::info_span!("MyIdentifier")),
)
.detach();
// Make sure to tick your executor to run the tasks
const DELTA: f64 = 1000.0 / 60.0;
executor.wait_till_completed(DELTA);
assert_eq!(executor.num_tasks(), 0);
}
#[test]
fn test_one_task() {
let runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(4)
.enable_all()
.build()
.unwrap();
let _guard = runtime.block_on(async move {
let _guard = test_init_tracing();
_guard
});
std::thread::sleep(Duration::from_millis(500));
run_test(async move {
tracing::info!("Start");
let r = add_async(1, 2).await;
tracing::info!("End: {r}");
});
}
#[test]
fn test_multiple_tasks() {
let executor = TickedAsyncExecutor::default();
executor
.spawn_local("A", async move {
tokio::task::yield_now().await;
})
.detach();
executor
.spawn_local(format!("B"), async move {
tokio::task::yield_now().await;
})
.detach();
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 2);
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 0);
}
#[test]
fn test_task_cancellation() {
let executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}"));
let task1 = executor.spawn_local("A", async move {
loop {
tokio::task::yield_now().await;
}
});
let task2 = executor.spawn_local(format!("B"), async move {
loop {
tokio::task::yield_now().await;
}
});
assert_eq!(executor.num_tasks(), 2);
executor.tick(DELTA, None);
executor
.spawn_local("CancelTasks", async move {
let (t1, t2) = tokio::join!(task1.cancel(), task2.cancel());
assert_eq!(t1, None);
assert_eq!(t2, None);
})
.detach();
assert_eq!(executor.num_tasks(), 3);
// Since we have cancelled the tasks above, the loops should eventually end
executor.wait_till_completed(DELTA);
}
#[test]
fn test_ticked_timer() {
let executor = TickedAsyncExecutor::default();
for _ in 0..10 {
let timer = executor.create_timer();
executor
.spawn_local("LocalTimer", async move {
timer.sleep_for(256.0).await;
})
.detach();
}
let now = Instant::now();
let mut instances = vec![];
while executor.num_tasks() != 0 {
let current = Instant::now();
executor.tick(DELTA, None);
instances.push(current.elapsed());
std::thread::sleep(Duration::from_millis(16));
}
let elapsed = now.elapsed();
println!("Elapsed: {:?}", elapsed);
println!("Total: {:?}", instances);
println!(
"Min: {:?}, Max: {:?}",
instances.iter().min(),
instances.iter().max()
);
// Test Timer cancellation
let timer = executor.create_timer();
executor
.spawn_local("LocalFuture1", async move {
timer.sleep_for(1000.0).await;
})
.detach();
let timer = executor.create_timer();
executor
.spawn_local("LocalFuture2", async move {
timer.sleep_for(1000.0).await;
})
.detach();
let mut tick_event = executor.tick_channel();
executor
.spawn_local("LocalTickFuture1", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
break;
}
}
})
.detach();
let mut tick_event = executor.tick_channel();
executor
.spawn_local("LocalTickFuture2", async move {
loop {
let _r = tick_event.changed().await;
if _r.is_err() {
break;
}
}
})
.detach();
executor.tick(DELTA, None);
assert_eq!(executor.num_tasks(), 4);
drop(executor);
}
#[test]
fn test_limit() {
let executor = TickedAsyncExecutor::default();
for i in 0..10 {
executor
.spawn_local(format!("{i}"), async move {
println!("Finish {i}");
})
.detach();
}
for i in 0..10 {
let num_tasks = executor.num_tasks();
assert_eq!(num_tasks, 10 - i);
executor.tick(0.1, Some(1));
}
assert_eq!(executor.num_tasks(), 0);
}
}