athena_rs 3.26.1

Hyper performant polyglot Database driver
Documentation
//! Chat runtime wiring: tenant pool resolution, WSS event publishing, and outbox worker.

use std::collections::HashSet;
use std::sync::Arc;
use std::time::Duration;

use actix_web::web::Data;
use async_trait::async_trait;
use athena_chat::events::ChatEventEnvelope;
use athena_chat::ids::RoomId;
use athena_chat::{ChatError, ChatEventPublisher, ChatPoolResolver, ChatResult};
use athena_wss::WsGateway;
use sqlx::PgPool;
use tracing::{info, warn};

use crate::AppState;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;

use super::auth::AuthResolver;

/// Placeholder for future chat storage integration hooks.
#[derive(Debug, Default)]
pub struct StorageFacade;

/// Placeholder for chat observability metrics.
#[derive(Debug, Default)]
pub struct ChatMetrics;

/// Resolves per-tenant PostgreSQL pools from the app registry, excluding configured clients.
pub struct AppChatPoolResolver {
    registry: Arc<PostgresClientRegistry>,
    excluded_clients: HashSet<String>,
}

impl AppChatPoolResolver {
    /// Creates a resolver that skips `excluded_clients` when listing pools for outbox drain.
    pub fn new(
        registry: Arc<PostgresClientRegistry>,
        excluded_clients: impl IntoIterator<Item = String>,
    ) -> Self {
        Self {
            registry,
            excluded_clients: excluded_clients
                .into_iter()
                .map(|client_name| normalize_client_name(&client_name))
                .collect(),
        }
    }

    fn is_chat_data_pool(&self, client_name: &str) -> bool {
        !self
            .excluded_clients
            .contains(&normalize_client_name(client_name))
    }
}

fn normalize_client_name(client_name: &str) -> String {
    client_name.trim().to_ascii_lowercase()
}

#[async_trait]
impl ChatPoolResolver for AppChatPoolResolver {
    async fn resolve_pool(&self, client_name: &str) -> ChatResult<PgPool> {
        self.registry.get_pool(client_name).ok_or_else(|| {
            ChatError::Unavailable(format!(
                "chat storage pool for client '{}' is not connected",
                client_name
            ))
        })
    }

    async fn list_pools(&self) -> ChatResult<Vec<(String, PgPool)>> {
        Ok(self
            .registry
            .list_clients()
            .into_iter()
            .filter(|client_name| self.is_chat_data_pool(client_name))
            .filter_map(|client_name| {
                self.registry
                    .get_pool(&client_name)
                    .map(|pool| (client_name, pool))
            })
            .collect())
    }
}

/// [`ChatEventPublisher`] adapter that forwards committed events to [`athena_wss::WsHub`].
pub struct WsHubPublisher {
    hub: Arc<athena_wss::WsHub>,
}

impl WsHubPublisher {
    /// Wraps `hub` as the chat outbox realtime publisher.
    pub fn new(hub: Arc<athena_wss::WsHub>) -> Self {
        Self { hub }
    }
}

#[async_trait]
impl ChatEventPublisher for WsHubPublisher {
    async fn publish_room_event(
        &self,
        client_name: &str,
        room_id: RoomId,
        event: ChatEventEnvelope,
    ) {
        self.hub
            .publish_room_event(client_name, room_id, event)
            .await;
    }
}

/// Spawns a background task that periodically drains pending chat outbox rows across tenants.
pub fn spawn_chat_outbox_worker(state: Data<AppState>) {
    let app = state.chat_app.clone();
    tokio::spawn(async move {
        info!("Chat outbox worker started");
        loop {
            match app.publish_pending_outbox(100).await {
                Ok(_) => tokio::time::sleep(Duration::from_millis(750)).await,
                Err(err) => {
                    warn!(error = %err, "chat outbox worker publish failed");
                    tokio::time::sleep(Duration::from_secs(2)).await;
                }
            }
        }
    });
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use super::AppChatPoolResolver;
    use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;

    #[test]
    fn chat_pool_exclusions_are_case_insensitive() {
        let resolver = AppChatPoolResolver::new(
            Arc::new(PostgresClientRegistry::empty()),
            ["Athena_Logging".to_string(), "ATHENA_BENCHMARK".to_string()],
        );

        assert!(!resolver.is_chat_data_pool("athena_logging"));
        assert!(!resolver.is_chat_data_pool("athena_benchmark"));
        assert!(resolver.is_chat_data_pool("suits_formations"));
    }
}