use blueprint_core::error::BoxError;
use blueprint_core::job::call::JobCall;
use blueprint_router::Router;
use blueprint_runner::config::BlueprintEnvironment;
use blueprint_runner::error::RunnerError;
use blueprint_runner::{BackgroundService, BlueprintRunner};
use futures::Stream;
use std::pin::Pin;
use std::task::{Context, Poll};
use std::time::Duration;
use tokio::sync::oneshot;
use tokio::time::timeout;
fn test_env() -> BlueprintEnvironment {
let mut env = BlueprintEnvironment::default();
env.test_mode = true;
env
}
struct PendingProducer;
impl Stream for PendingProducer {
type Item = Result<JobCall, BoxError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Pending
}
}
struct ErrorProducer {
error_sent: bool,
}
impl ErrorProducer {
fn new() -> Self {
Self { error_sent: false }
}
}
impl Stream for ErrorProducer {
type Item = Result<JobCall, BoxError>;
fn poll_next(mut self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
if !self.error_sent {
self.error_sent = true;
Poll::Ready(Some(Err("producer error".into())))
} else {
Poll::Pending
}
}
}
struct EndingProducer;
impl Stream for EndingProducer {
type Item = Result<JobCall, BoxError>;
fn poll_next(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
Poll::Ready(None) }
}
#[derive(Clone)]
struct TestBackgroundService;
impl BackgroundService for TestBackgroundService {
async fn start(&self) -> Result<oneshot::Receiver<Result<(), RunnerError>>, RunnerError> {
let (tx, rx) = oneshot::channel();
tokio::spawn(async move {
tokio::time::sleep(Duration::from_secs(3600)).await;
let _ = tx.send(Ok(()));
});
Ok(rx)
}
}
#[tokio::test]
async fn builder_without_router_returns_no_router_error() {
let env = test_env();
let result = BlueprintRunner::builder((), env)
.producer(PendingProducer)
.run()
.await;
match result {
Err(RunnerError::NoRouter) => {} other => panic!("Expected NoRouter error, got: {:?}", other),
}
}
#[tokio::test]
async fn builder_without_producers_returns_no_producers_error() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let result = BlueprintRunner::builder((), env)
.router(router)
.run()
.await;
match result {
Err(RunnerError::NoProducers) => {} other => panic!("Expected NoProducers error, got: {:?}", other),
}
}
#[tokio::test]
async fn builder_with_all_components_is_valid() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let handle = tokio::spawn(async move {
BlueprintRunner::builder((), env)
.router(router)
.producer(PendingProducer)
.run()
.await
});
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
}
struct ContinueRunningConfig;
impl blueprint_runner::BlueprintConfig for ContinueRunningConfig {
async fn requires_registration(
&self,
_env: &BlueprintEnvironment,
) -> Result<bool, RunnerError> {
Ok(false) }
fn should_exit_after_registration(&self) -> bool {
false }
}
#[tokio::test]
async fn producer_error_propagates() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let result = timeout(
Duration::from_millis(500),
BlueprintRunner::builder(ContinueRunningConfig, env)
.router(router)
.producer(ErrorProducer::new())
.run(),
)
.await;
match result {
Ok(Err(RunnerError::Producer(_))) => {} other => panic!("Expected Producer error, got: {:?}", other),
}
}
#[tokio::test]
async fn producer_stream_ending_returns_error() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let result = timeout(
Duration::from_millis(500),
BlueprintRunner::builder(ContinueRunningConfig, env)
.router(router)
.producer(EndingProducer)
.run(),
)
.await;
match result {
Ok(Err(RunnerError::Producer(_))) => {} other => panic!("Expected Producer error (stream ended), got: {:?}", other),
}
}
#[tokio::test]
async fn builder_accepts_multiple_producers() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let _builder = BlueprintRunner::builder((), env)
.router(router)
.producer(PendingProducer)
.producer(PendingProducer);
}
#[tokio::test]
async fn builder_accepts_background_service() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let _builder = BlueprintRunner::builder((), env)
.router(router)
.producer(PendingProducer)
.background_service(TestBackgroundService);
}
#[tokio::test]
async fn builder_accepts_shutdown_handler() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let _builder = BlueprintRunner::builder((), env)
.router(router)
.producer(PendingProducer)
.with_shutdown_handler(async {
println!("Shutdown!");
});
}
#[tokio::test]
async fn custom_config_is_accepted() {
let env = test_env();
let router = Router::new().route(0u32, || async { "test" });
let handle = tokio::spawn(async move {
BlueprintRunner::builder(ContinueRunningConfig, env)
.router(router)
.producer(PendingProducer)
.run()
.await
});
tokio::time::sleep(Duration::from_millis(10)).await;
handle.abort();
}
#[test]
fn runner_error_display() {
let err = RunnerError::NoRouter;
assert!(err.to_string().contains("router"));
let err = RunnerError::NoProducers;
assert!(err.to_string().contains("producer"));
let err = RunnerError::BackgroundService("test error".to_string());
assert!(err.to_string().contains("test error"));
}
#[test]
fn job_call_error_display() {
use blueprint_runner::error::JobCallError;
let err = JobCallError::JobFailed("test".into());
assert!(err.to_string().contains("failed"));
}
#[test]
fn producer_error_display() {
use blueprint_runner::error::ProducerError;
let err = ProducerError::StreamEnded;
assert!(err.to_string().contains("ended"));
let err = ProducerError::Failed("test".into());
assert!(err.to_string().contains("failed"));
}