use std::{
sync::{
Arc,
atomic::{AtomicBool, AtomicUsize, Ordering},
},
time::Duration,
};
use rs_zero::core::{CoreError, CoreResult, Service, ServiceFuture, ServiceGroup, ShutdownToken};
use tokio::sync::{Barrier, Notify};
#[tokio::test]
async fn service_group_starts_services_concurrently_and_stops_once() {
let barrier = Arc::new(Barrier::new(3));
let stop_count = Arc::new(AtomicUsize::new(0));
let mut group = ServiceGroup::new();
group.add(TestService::new(
"a",
barrier.clone(),
stop_count.clone(),
None,
));
group.add(TestService::new(
"b",
barrier.clone(),
stop_count.clone(),
None,
));
let handle = group.handle();
let runner = tokio::spawn(async move {
group
.start_with_shutdown(async { std::future::pending::<()>().await })
.await
});
tokio::time::timeout(Duration::from_secs(1), barrier.wait())
.await
.expect("services started concurrently");
handle.stop();
runner.await.expect("join").expect("service group");
assert_eq!(stop_count.load(Ordering::SeqCst), 2);
}
#[tokio::test]
async fn service_group_external_shutdown_stops_services() {
let stop_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let (tx, rx) = tokio::sync::oneshot::channel();
let mut group = ServiceGroup::new();
group.add(TestService::new(
"api",
barrier.clone(),
stop_count.clone(),
None,
));
let runner = tokio::spawn(async move {
group
.start_with_shutdown(async {
let _ = rx.await;
})
.await
});
tokio::time::timeout(Duration::from_secs(1), barrier.wait())
.await
.expect("service started");
let _ = tx.send(());
runner.await.expect("join").expect("service group");
assert_eq!(stop_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn service_error_stops_the_group() {
let stop_count = Arc::new(AtomicUsize::new(0));
let barrier = Arc::new(Barrier::new(2));
let failing = FailingService::new("failing", barrier.clone());
let mut group = ServiceGroup::new();
group.add(TestService::new(
"worker",
barrier.clone(),
stop_count.clone(),
None,
));
group.add(failing);
let error = group
.start_with_shutdown(async { std::future::pending::<()>().await })
.await
.expect_err("service group should fail");
assert!(error.to_string().contains("failing"));
assert_eq!(stop_count.load(Ordering::SeqCst), 1);
}
#[tokio::test]
async fn add_fn_service_runs_until_shutdown() {
let started = Arc::new(Notify::new());
let stopped = Arc::new(AtomicBool::new(false));
let mut group = ServiceGroup::new();
let handle = group.handle();
let started_for_service = started.clone();
let stopped_for_service = stopped.clone();
group.add_fn("fn-service", move |shutdown| {
let started = started_for_service.clone();
let stopped = stopped_for_service.clone();
async move {
started.notify_one();
shutdown.cancelled().await;
stopped.store(true, Ordering::SeqCst);
Ok(())
}
});
let runner = tokio::spawn(async move {
group
.start_with_shutdown(async { std::future::pending::<()>().await })
.await
});
tokio::time::timeout(Duration::from_secs(1), started.notified())
.await
.expect("service started");
handle.stop();
runner.await.expect("join").expect("service group");
assert!(stopped.load(Ordering::SeqCst));
}
#[tokio::test]
async fn service_group_reports_shutdown_timeout() {
let config = rs_zero::core::ServiceGroupConfig {
shutdown_timeout: Duration::from_millis(20),
..rs_zero::core::ServiceGroupConfig::default()
};
let mut group = ServiceGroup::with_config(config);
let handle = group.handle();
group.add_fn("stubborn", |shutdown| async move {
shutdown.cancelled().await;
std::future::pending::<CoreResult<()>>().await
});
let runner = tokio::spawn(async move {
group
.start_with_shutdown(async { std::future::pending::<()>().await })
.await
});
tokio::time::sleep(Duration::from_millis(10)).await;
handle.stop();
let error = runner
.await
.expect("join")
.expect_err("service group should time out");
assert!(error.to_string().contains("shutdown timed out"));
}
struct TestService {
name: String,
barrier: Arc<Barrier>,
stop_count: Arc<AtomicUsize>,
stop_error: Option<String>,
}
impl TestService {
fn new(
name: impl Into<String>,
barrier: Arc<Barrier>,
stop_count: Arc<AtomicUsize>,
stop_error: Option<String>,
) -> Self {
Self {
name: name.into(),
barrier,
stop_count,
stop_error,
}
}
}
impl Service for TestService {
fn name(&self) -> &str {
&self.name
}
fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
Box::pin(async move {
self.barrier.wait().await;
shutdown.cancelled().await;
Ok(())
})
}
fn stop(&self) -> ServiceFuture<'_> {
Box::pin(async move {
self.stop_count.fetch_add(1, Ordering::SeqCst);
if let Some(error) = &self.stop_error {
return Err(CoreError::Service(error.clone()));
}
Ok(())
})
}
}
struct FailingService {
name: String,
barrier: Arc<Barrier>,
}
impl FailingService {
fn new(name: impl Into<String>, barrier: Arc<Barrier>) -> Self {
Self {
name: name.into(),
barrier,
}
}
}
impl Service for FailingService {
fn name(&self) -> &str {
&self.name
}
fn start(&self, _shutdown: ShutdownToken) -> ServiceFuture<'_> {
Box::pin(async move {
self.barrier.wait().await;
Err(CoreError::Service("boom".to_string()))
})
}
}