const TEST_DATABASE_URL: &str = "postgres://postgres:postgres@localhost:5432/asynq_test";
#[cfg(feature = "postgres")]
#[tokio::test]
async fn test_postgres_server_registration() -> Result<(), Box<dyn std::error::Error>> {
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use uuid::Uuid;
use asynq::backend::PostgresBroker;
use asynq::backend::PostgresInspector;
use asynq::base::Broker;
use asynq::inspector::InspectorTrait;
use asynq::proto::ServerInfo;
let database_url =
std::env::var("DATABASE_URL").unwrap_or_else(|_| TEST_DATABASE_URL.to_string());
let broker = match PostgresBroker::builder()
.database_url(&database_url)
.build()
.await
{
Ok(b) => Arc::new(b),
Err(e) => {
println!("Skipping test - PostgresSQL not available: {}", e);
return Ok(());
}
};
let inspector = PostgresInspector::from_broker(broker.clone());
let hostname = "test-host";
let pid = 12345;
let server_uuid = Uuid::new_v4().to_string();
let mut queues = HashMap::new();
queues.insert("default".to_string(), 1);
queues.insert("critical".to_string(), 2);
let server_info = ServerInfo {
host: hostname.to_string(),
pid,
server_id: server_uuid.clone(),
concurrency: 10,
queues,
strict_priority: false,
status: "active".to_string(),
start_time: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
active_worker_count: 0,
};
println!("๐ Writing server state to PostgresSQL...");
broker
.write_server_state(&server_info, vec![], Duration::from_secs(300), None)
.await?;
println!("โ
Server state written successfully");
println!("๐ Retrieving servers from database...");
let servers = inspector.get_servers().await?;
println!("๐ Found {} servers", servers.len());
let found_server = servers.iter().find(|s| s.server_id == server_uuid);
assert!(
found_server.is_some(),
"Server should be found in server list"
);
let found_server = found_server.unwrap();
assert_eq!(found_server.host, hostname);
assert_eq!(found_server.pid, pid);
assert_eq!(found_server.concurrency, 10);
assert_eq!(found_server.status, "active");
println!("โ
Server found with correct information:");
println!(" Host: {}", found_server.host);
println!(" PID: {}", found_server.pid);
println!(" Server ID: {}", found_server.server_id);
println!(" Concurrency: {}", found_server.concurrency);
println!("๐งน Cleaning up server state...");
broker
.clear_server_state(hostname, pid, &server_uuid, None)
.await?;
let servers_after_cleanup = inspector.get_servers().await?;
let found_after_cleanup = servers_after_cleanup
.iter()
.find(|s| s.server_id == server_uuid);
assert!(
found_after_cleanup.is_none(),
"Server should be removed after cleanup"
);
println!("โ
All PostgresSQL server registration tests passed!");
Ok(())
}
#[cfg(feature = "postgres")]
#[tokio::test]
async fn test_postgres_worker_registration() -> Result<(), Box<dyn std::error::Error>> {
use std::sync::Arc;
use uuid::Uuid;
use asynq::backend::PostgresBroker;
use asynq::proto::WorkerInfo;
let database_url =
std::env::var("DATABASE_URL").unwrap_or_else(|_| TEST_DATABASE_URL.to_string());
let broker = match PostgresBroker::builder()
.database_url(&database_url)
.build()
.await
{
Ok(b) => Arc::new(b),
Err(e) => {
println!("Skipping test - PostgresSQL not available: {}", e);
return Ok(());
}
};
let hostname = "test-worker-host";
let pid = 54321;
let server_uuid = Uuid::new_v4().to_string();
let task_id = Uuid::new_v4().to_string();
let worker_info = WorkerInfo {
host: hostname.to_string(),
pid,
server_id: server_uuid.clone(),
task_id: task_id.clone(),
task_type: "test:task".to_string(),
task_payload: b"test payload".to_vec(),
queue: "default".to_string(),
start_time: Some(prost_types::Timestamp::from(std::time::SystemTime::now())),
deadline: None,
};
println!("๐ Writing worker state to PostgresSQL...");
broker.write_worker_state(&worker_info).await?;
println!("โ
Worker state written successfully");
println!("๐งน Cleaning up worker state...");
broker
.clear_worker_state(hostname, pid, &server_uuid, &task_id)
.await?;
println!("โ
All PostgresSQL worker registration tests passed!");
Ok(())
}