use std::sync::Arc;
use resolute::{Client, PgListener};
use tokio::sync::watch;
use crate::{build_schema_cache, SchemaCache, SchemaCacheError};
pub async fn start_schema_listener(
addr: &str,
user: &str,
password: &str,
database: &str,
schemas: Vec<String>,
tx: watch::Sender<Arc<SchemaCache>>,
channel_name: &str,
) -> Result<(), SchemaCacheError> {
let mut listener = PgListener::connect(addr, user, password, database)
.await
.map_err(SchemaCacheError::Database)?;
listener
.listen(channel_name)
.await
.map_err(SchemaCacheError::Database)?;
tracing::info!("Schema listener started on channel '{channel_name}'");
loop {
let notification = listener.recv().await.map_err(SchemaCacheError::Database)?;
if notification.channel != channel_name {
continue;
}
tracing::info!("Schema reload notification received");
let client = match Client::connect(addr, user, password, database).await {
Ok(c) => c,
Err(e) => {
tracing::error!("Schema reload connect failed: {e}");
continue;
}
};
match build_schema_cache(&client, &schemas).await {
Ok(cache) => {
tx.send(Arc::new(cache)).ok();
tracing::info!("Schema cache reloaded");
}
Err(e) => {
tracing::error!("Schema cache reload failed: {e}");
}
}
}
}