use asynq::backend::RedisConnectionType;
use asynq::components::aggregator::{Aggregator, AggregatorConfig};
use asynq::components::forwarder::{Forwarder, ForwarderConfig};
use asynq::components::healthcheck::{Healthcheck, HealthcheckConfig};
use asynq::components::heartbeat::{Heartbeat, HeartbeatMeta};
use asynq::components::janitor::{Janitor, JanitorConfig};
use asynq::components::recoverer::{Recoverer, RecovererConfig};
use asynq::components::subscriber::{Subscriber, SubscriberConfig};
use asynq::components::ComponentLifecycle;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
#[tokio::test]
#[ignore] async fn test_lifecycle_trait_usage() {
use asynq::backend::RedisBroker;
use asynq::backend::RedisConnectionType;
let redis_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_config.clone()).await.unwrap());
let components: Vec<Arc<dyn ComponentLifecycle>> = vec![
Arc::new(Aggregator::new(broker.clone(), AggregatorConfig::default())),
Arc::new(Forwarder::new(broker.clone(), ForwarderConfig::default())),
Arc::new(Healthcheck::new(
broker.clone(),
HealthcheckConfig::default(),
)),
Arc::new(Janitor::new(broker.clone(), JanitorConfig::default())),
Arc::new(Recoverer::new(broker.clone(), RecovererConfig::default())),
Arc::new(Subscriber::new(broker.clone(), SubscriberConfig::default())),
{
let (heartbeat, _sender) = Heartbeat::new(
broker.clone(),
Duration::from_secs(5),
HeartbeatMeta {
host: "localhost".to_string(),
pid: 1234,
server_uuid: "test-server".to_string(),
concurrency: 10,
queues: HashMap::from([("default".to_string(), 1)]),
strict_priority: false,
started: SystemTime::now(),
acl_tenant: None,
},
);
Arc::new(heartbeat) as Arc<dyn ComponentLifecycle>
},
];
let mut handles = Vec::new();
for component in &components {
assert!(!component.is_done());
let handle = component.clone().start();
handles.push(handle);
}
tokio::time::sleep(Duration::from_millis(100)).await;
for component in &components {
component.shutdown();
}
for component in &components {
assert!(component.is_done());
}
for handle in handles {
let _ = handle.await;
}
}
#[tokio::test]
#[ignore] async fn test_janitor_lifecycle() {
use asynq::backend::RedisBroker;
let redis_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_config).await.unwrap());
let janitor: Arc<dyn ComponentLifecycle> =
Arc::new(Janitor::new(broker, JanitorConfig::default()));
assert!(!janitor.is_done());
let handle = janitor.clone().start();
tokio::time::sleep(Duration::from_millis(50)).await;
assert!(!janitor.is_done());
janitor.shutdown();
assert!(janitor.is_done());
handle.await.unwrap();
}
#[tokio::test]
#[ignore] async fn test_generic_component_management() {
use asynq::backend::RedisBroker;
let redis_config = RedisConnectionType::single("redis://localhost:6379").unwrap();
let broker = Arc::new(RedisBroker::new(redis_config).await.unwrap());
async fn manage_component(component: Arc<dyn ComponentLifecycle>, run_duration: Duration) {
let handle = component.clone().start();
tokio::time::sleep(run_duration).await;
component.shutdown();
handle.await.unwrap();
}
let forwarder: Arc<dyn ComponentLifecycle> =
Arc::new(Forwarder::new(broker.clone(), ForwarderConfig::default()));
manage_component(forwarder, Duration::from_millis(50)).await;
let recoverer: Arc<dyn ComponentLifecycle> =
Arc::new(Recoverer::new(broker.clone(), RecovererConfig::default()));
manage_component(recoverer, Duration::from_millis(50)).await;
}