rs-zero 0.2.8

Rust-first microservice framework inspired by go-zero engineering practices
Documentation
use std::{
    sync::{
        Arc,
        atomic::{AtomicBool, AtomicUsize, Ordering},
    },
    time::Duration,
};

use rs_zero::core::{CoreError, CoreResult, Service, ServiceFuture, ServiceGroup, ShutdownToken};
use tokio::sync::{Barrier, Notify};

#[tokio::test]
async fn service_group_starts_services_concurrently_and_stops_once() {
    let barrier = Arc::new(Barrier::new(3));
    let stop_count = Arc::new(AtomicUsize::new(0));

    let mut group = ServiceGroup::new();
    group.add(TestService::new(
        "a",
        barrier.clone(),
        stop_count.clone(),
        None,
    ));
    group.add(TestService::new(
        "b",
        barrier.clone(),
        stop_count.clone(),
        None,
    ));
    let handle = group.handle();

    let runner = tokio::spawn(async move {
        group
            .start_with_shutdown(async { std::future::pending::<()>().await })
            .await
    });

    tokio::time::timeout(Duration::from_secs(1), barrier.wait())
        .await
        .expect("services started concurrently");
    handle.stop();

    runner.await.expect("join").expect("service group");
    assert_eq!(stop_count.load(Ordering::SeqCst), 2);
}

#[tokio::test]
async fn service_group_external_shutdown_stops_services() {
    let stop_count = Arc::new(AtomicUsize::new(0));
    let barrier = Arc::new(Barrier::new(2));
    let (tx, rx) = tokio::sync::oneshot::channel();

    let mut group = ServiceGroup::new();
    group.add(TestService::new(
        "api",
        barrier.clone(),
        stop_count.clone(),
        None,
    ));

    let runner = tokio::spawn(async move {
        group
            .start_with_shutdown(async {
                let _ = rx.await;
            })
            .await
    });

    tokio::time::timeout(Duration::from_secs(1), barrier.wait())
        .await
        .expect("service started");
    let _ = tx.send(());

    runner.await.expect("join").expect("service group");
    assert_eq!(stop_count.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn service_error_stops_the_group() {
    let stop_count = Arc::new(AtomicUsize::new(0));
    let barrier = Arc::new(Barrier::new(2));
    let failing = FailingService::new("failing", barrier.clone());

    let mut group = ServiceGroup::new();
    group.add(TestService::new(
        "worker",
        barrier.clone(),
        stop_count.clone(),
        None,
    ));
    group.add(failing);

    let error = group
        .start_with_shutdown(async { std::future::pending::<()>().await })
        .await
        .expect_err("service group should fail");

    assert!(error.to_string().contains("failing"));
    assert_eq!(stop_count.load(Ordering::SeqCst), 1);
}

#[tokio::test]
async fn add_fn_service_runs_until_shutdown() {
    let started = Arc::new(Notify::new());
    let stopped = Arc::new(AtomicBool::new(false));

    let mut group = ServiceGroup::new();
    let handle = group.handle();
    let started_for_service = started.clone();
    let stopped_for_service = stopped.clone();
    group.add_fn("fn-service", move |shutdown| {
        let started = started_for_service.clone();
        let stopped = stopped_for_service.clone();
        async move {
            started.notify_one();
            shutdown.cancelled().await;
            stopped.store(true, Ordering::SeqCst);
            Ok(())
        }
    });

    let runner = tokio::spawn(async move {
        group
            .start_with_shutdown(async { std::future::pending::<()>().await })
            .await
    });

    tokio::time::timeout(Duration::from_secs(1), started.notified())
        .await
        .expect("service started");
    handle.stop();
    runner.await.expect("join").expect("service group");

    assert!(stopped.load(Ordering::SeqCst));
}

#[tokio::test]
async fn service_group_reports_shutdown_timeout() {
    let config = rs_zero::core::ServiceGroupConfig {
        shutdown_timeout: Duration::from_millis(20),
        ..rs_zero::core::ServiceGroupConfig::default()
    };

    let mut group = ServiceGroup::with_config(config);
    let handle = group.handle();
    group.add_fn("stubborn", |shutdown| async move {
        shutdown.cancelled().await;
        std::future::pending::<CoreResult<()>>().await
    });

    let runner = tokio::spawn(async move {
        group
            .start_with_shutdown(async { std::future::pending::<()>().await })
            .await
    });

    tokio::time::sleep(Duration::from_millis(10)).await;
    handle.stop();

    let error = runner
        .await
        .expect("join")
        .expect_err("service group should time out");
    assert!(error.to_string().contains("shutdown timed out"));
}

struct TestService {
    name: String,
    barrier: Arc<Barrier>,
    stop_count: Arc<AtomicUsize>,
    stop_error: Option<String>,
}

impl TestService {
    fn new(
        name: impl Into<String>,
        barrier: Arc<Barrier>,
        stop_count: Arc<AtomicUsize>,
        stop_error: Option<String>,
    ) -> Self {
        Self {
            name: name.into(),
            barrier,
            stop_count,
            stop_error,
        }
    }
}

impl Service for TestService {
    fn name(&self) -> &str {
        &self.name
    }

    fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
        Box::pin(async move {
            self.barrier.wait().await;
            shutdown.cancelled().await;
            Ok(())
        })
    }

    fn stop(&self) -> ServiceFuture<'_> {
        Box::pin(async move {
            self.stop_count.fetch_add(1, Ordering::SeqCst);
            if let Some(error) = &self.stop_error {
                return Err(CoreError::Service(error.clone()));
            }
            Ok(())
        })
    }
}

struct FailingService {
    name: String,
    barrier: Arc<Barrier>,
}

impl FailingService {
    fn new(name: impl Into<String>, barrier: Arc<Barrier>) -> Self {
        Self {
            name: name.into(),
            barrier,
        }
    }
}

impl Service for FailingService {
    fn name(&self) -> &str {
        &self.name
    }

    fn start(&self, _shutdown: ShutdownToken) -> ServiceFuture<'_> {
        Box::pin(async move {
            self.barrier.wait().await;
            Err(CoreError::Service("boom".to_string()))
        })
    }
}