use async_trait::async_trait;
use log::info;
use pingora_core::server::configuration::Opt;
#[cfg(unix)]
use pingora_core::server::ListenFds;
use pingora_core::server::{Server, ShutdownWatch};
use pingora_core::services::{Service, ServiceWithDependents};
use pingora_core::services::ServiceReadyNotifier;
use std::sync::Arc;
use tokio::sync::Mutex;
use tokio::time::{sleep, Duration};
pub struct DatabaseService {
connection_string: Arc<Mutex<Option<String>>>,
}
impl DatabaseService {
fn new() -> Self {
Self {
connection_string: Arc::new(Mutex::new(None)),
}
}
fn get_connection_string(&self) -> Arc<Mutex<Option<String>>> {
self.connection_string.clone()
}
}
#[async_trait]
impl ServiceWithDependents for DatabaseService {
async fn start_service(
&mut self,
#[cfg(unix)] _fds: Option<ListenFds>,
mut shutdown: ShutdownWatch,
_listeners_per_fd: usize,
ready_notifier: ServiceReadyNotifier,
) {
info!("DatabaseService: Starting initialization...");
sleep(Duration::from_secs(2)).await;
{
let mut conn = self.connection_string.lock().await;
*conn = Some("postgresql://localhost:5432/mydb".to_string());
}
info!("DatabaseService: Initialization complete, signaling ready");
ready_notifier.notify_ready();
shutdown.changed().await.ok();
info!("DatabaseService: Shutting down");
}
fn name(&self) -> &str {
"database"
}
fn threads(&self) -> Option<usize> {
Some(1)
}
}
pub struct CacheService;
#[async_trait]
impl Service for CacheService {
async fn start_service(
&mut self,
#[cfg(unix)] _fds: Option<ListenFds>,
mut shutdown: ShutdownWatch,
_listeners_per_fd: usize,
) {
info!("CacheService: Starting (ready immediately)...");
sleep(Duration::from_secs(1)).await;
info!("CacheService: Warmup complete");
shutdown.changed().await.ok();
info!("CacheService: Shutting down");
}
fn name(&self) -> &str {
"cache"
}
fn threads(&self) -> Option<usize> {
Some(1)
}
}
pub struct ApiService {
db_connection: Arc<Mutex<Option<String>>>,
}
impl ApiService {
fn new(db_connection: Arc<Mutex<Option<String>>>) -> Self {
Self { db_connection }
}
}
#[async_trait]
impl Service for ApiService {
async fn start_service(
&mut self,
#[cfg(unix)] _fds: Option<ListenFds>,
mut shutdown: ShutdownWatch,
_listeners_per_fd: usize,
) {
info!("ApiService: Starting (dependencies should be ready)...");
{
let conn = self.db_connection.lock().await;
if let Some(conn_str) = &*conn {
info!("ApiService: Using database connection: {}", conn_str);
} else {
panic!("ApiService: Database connection not available!");
}
}
info!("ApiService: Ready to serve requests");
shutdown.changed().await.ok();
info!("ApiService: Shutting down");
}
fn name(&self) -> &str {
"api"
}
fn threads(&self) -> Option<usize> {
Some(1)
}
}
fn main() {
env_logger::Builder::from_default_env()
.filter_level(log::LevelFilter::Info)
.init();
info!("Starting server with service dependencies...");
let opt = Opt::parse_args();
let mut server = Server::new(Some(opt)).unwrap();
server.bootstrap();
let db_service = DatabaseService::new();
let db_connection = db_service.get_connection_string();
let cache_service = CacheService;
let api_service = ApiService::new(db_connection);
let db_handle = server.add_service(db_service);
let cache_handle = server.add_service(cache_service);
let api_handle = server.add_service(api_service);
api_handle.add_dependency(db_handle);
api_handle.add_dependency(&cache_handle);
info!("Services configured. Starting server...");
info!("Expected startup order:");
info!(" 1. database (will initialize for 2 seconds)");
info!(" 2. cache (will initialize for 1 second)");
info!(" 3. api (will wait for both, then start)");
info!("");
info!("Press Ctrl+C to shut down");
server.run_forever();
}