ticked_async_executor 0.2.0

Local executor that runs woken async tasks when it is ticked
Documentation
#[cfg(test)]
mod tests {
    use super::*;
    use opentelemetry::trace::TracerProvider;
    use opentelemetry_otlp::WithExportConfig;
    use std::time::{Duration, Instant};
    use tracing::{info, Instrument};
    use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};

    const DELTA: f64 = 1000.0 / 60.0;

    #[must_use]
    pub struct TracingGuard {
        opentelemetry_trace_guard: opentelemetry_sdk::trace::SdkTracerProvider,
        chrome_guard: tracing_chrome::FlushGuard,
    }

    impl Drop for TracingGuard {
        fn drop(&mut self) {
            if let Err(err) = self.opentelemetry_trace_guard.shutdown() {
                eprintln!("{err:?}");
            }
        }
    }

    fn resource() -> opentelemetry_sdk::Resource {
        opentelemetry_sdk::Resource::builder()
            .with_service_name("my-test-service")
            .with_schema_url(
                [
                    opentelemetry::KeyValue::new(
                        opentelemetry_semantic_conventions::resource::SERVICE_NAME,
                        env!("CARGO_PKG_NAME"),
                    ),
                    opentelemetry::KeyValue::new(
                        opentelemetry_semantic_conventions::resource::SERVICE_VERSION,
                        env!("CARGO_PKG_VERSION"),
                    ),
                    // opentelemetry::KeyValue::new(DEPLOYMENT_ENVIRONMENT_NAME, "develop"),
                ],
                opentelemetry_semantic_conventions::SCHEMA_URL,
            )
            .build()
    }

    pub fn test_init_opentelemetry_layer() -> opentelemetry_sdk::trace::SdkTracerProvider {
        let exporter = opentelemetry_otlp::SpanExporter::builder()
            .with_tonic()
            .with_endpoint("http://127.0.0.1:4317")
            .build()
            .unwrap();

        opentelemetry_sdk::trace::SdkTracerProvider::builder()
            .with_resource(resource())
            .with_batch_exporter(exporter)
            .build()
    }

    pub fn test_init_tracing() -> TracingGuard {
        let otlp_tracer_provider = test_init_opentelemetry_layer();
        let otlp_tracer = otlp_tracer_provider.tracer("my-tracer");

        let (chrome_layer, _guard) = tracing_chrome::ChromeLayerBuilder::new()
            .trace_style(tracing_chrome::TraceStyle::Async)
            .include_args(true)
            .build();

        let tracy_layer = tracing_tracy::TracyLayer::default();

        let forest_layer = tracing_forest::ForestLayer::default();

        let registry = tracing_subscriber::registry()
            // Write to console - Bad with async, console only
            // .with(tracing_subscriber::fmt::layer())
            // Chrome tracing - Bad with async, has UI
            .with(chrome_layer)
            // Tracy tracing - Bad with async, has UI
            .with(tracy_layer)
            // Write contextual traces to console - Good with async, console only
            .with(forest_layer)
            // Opentelemtry Jaeger - Good with async, has UI
            .with(tracing_opentelemetry::OpenTelemetryLayer::new(otlp_tracer));
        registry.init();

        // tracing::subscriber::set_global_default(registry).expect("setup tracy layer");
        TracingGuard {
            opentelemetry_trace_guard: otlp_tracer_provider,
            chrome_guard: _guard,
        }
    }

    #[tracing::instrument(ret)]
    async fn add_async(a: i32, b: i32) -> i32 {
        a + b
    }

    #[tracing::instrument(skip_all)]
    fn run_test(future: impl Future<Output = ()> + 'static) {
        let executor = TickedAsyncExecutor::new(|state| {
            info!("State: {state:?}");
        });

        executor
            .spawn_local(
                "MyIdentifier",
                future.instrument(tracing::info_span!("MyIdentifier")),
            )
            .detach();

        // Make sure to tick your executor to run the tasks
        const DELTA: f64 = 1000.0 / 60.0;
        executor.wait_till_completed(DELTA);
        assert_eq!(executor.num_tasks(), 0);
    }

    #[test]
    fn test_one_task() {
        let runtime = tokio::runtime::Builder::new_multi_thread()
            .worker_threads(4)
            .enable_all()
            .build()
            .unwrap();

        let _guard = runtime.block_on(async move {
            let _guard = test_init_tracing();
            _guard
        });
        std::thread::sleep(Duration::from_millis(500));

        run_test(async move {
            tracing::info!("Start");
            let r = add_async(1, 2).await;
            tracing::info!("End: {r}");
        });
    }

    #[test]
    fn test_multiple_tasks() {
        let executor = TickedAsyncExecutor::default();
        executor
            .spawn_local("A", async move {
                tokio::task::yield_now().await;
            })
            .detach();

        executor
            .spawn_local(format!("B"), async move {
                tokio::task::yield_now().await;
            })
            .detach();

        executor.tick(DELTA, None);
        assert_eq!(executor.num_tasks(), 2);

        executor.tick(DELTA, None);
        assert_eq!(executor.num_tasks(), 0);
    }

    #[test]
    fn test_task_cancellation() {
        let executor = TickedAsyncExecutor::new(|_state| println!("{_state:?}"));
        let task1 = executor.spawn_local("A", async move {
            loop {
                tokio::task::yield_now().await;
            }
        });

        let task2 = executor.spawn_local(format!("B"), async move {
            loop {
                tokio::task::yield_now().await;
            }
        });
        assert_eq!(executor.num_tasks(), 2);
        executor.tick(DELTA, None);

        executor
            .spawn_local("CancelTasks", async move {
                let (t1, t2) = tokio::join!(task1.cancel(), task2.cancel());
                assert_eq!(t1, None);
                assert_eq!(t2, None);
            })
            .detach();
        assert_eq!(executor.num_tasks(), 3);

        // Since we have cancelled the tasks above, the loops should eventually end
        executor.wait_till_completed(DELTA);
    }

    #[test]
    fn test_ticked_timer() {
        let executor = TickedAsyncExecutor::default();

        for _ in 0..10 {
            let timer = executor.create_timer();
            executor
                .spawn_local("LocalTimer", async move {
                    timer.sleep_for(256.0).await;
                })
                .detach();
        }

        let now = Instant::now();
        let mut instances = vec![];
        while executor.num_tasks() != 0 {
            let current = Instant::now();
            executor.tick(DELTA, None);
            instances.push(current.elapsed());
            std::thread::sleep(Duration::from_millis(16));
        }
        let elapsed = now.elapsed();
        println!("Elapsed: {:?}", elapsed);
        println!("Total: {:?}", instances);
        println!(
            "Min: {:?}, Max: {:?}",
            instances.iter().min(),
            instances.iter().max()
        );

        // Test Timer cancellation
        let timer = executor.create_timer();
        executor
            .spawn_local("LocalFuture1", async move {
                timer.sleep_for(1000.0).await;
            })
            .detach();

        let timer = executor.create_timer();
        executor
            .spawn_local("LocalFuture2", async move {
                timer.sleep_for(1000.0).await;
            })
            .detach();

        let mut tick_event = executor.tick_channel();
        executor
            .spawn_local("LocalTickFuture1", async move {
                loop {
                    let _r = tick_event.changed().await;
                    if _r.is_err() {
                        break;
                    }
                }
            })
            .detach();

        let mut tick_event = executor.tick_channel();
        executor
            .spawn_local("LocalTickFuture2", async move {
                loop {
                    let _r = tick_event.changed().await;
                    if _r.is_err() {
                        break;
                    }
                }
            })
            .detach();

        executor.tick(DELTA, None);
        assert_eq!(executor.num_tasks(), 4);
        drop(executor);
    }

    #[test]
    fn test_limit() {
        let executor = TickedAsyncExecutor::default();
        for i in 0..10 {
            executor
                .spawn_local(format!("{i}"), async move {
                    println!("Finish {i}");
                })
                .detach();
        }

        for i in 0..10 {
            let num_tasks = executor.num_tasks();
            assert_eq!(num_tasks, 10 - i);
            executor.tick(0.1, Some(1));
        }

        assert_eq!(executor.num_tasks(), 0);
    }
}