athena_rs 3.23.0

Hyper performant polyglot Database driver
Documentation
use std::sync::Arc;
use std::time::Duration;

use actix_web::web::Data;
use async_trait::async_trait;
use athena_chat::events::ChatEventEnvelope;
use athena_chat::ids::RoomId;
use athena_chat::{ChatError, ChatEventPublisher, ChatPoolResolver, ChatResult};
use athena_wss::WsGateway;
use sqlx::PgPool;
use tracing::{info, warn};

use crate::AppState;
use crate::drivers::postgresql::sqlx_driver::PostgresClientRegistry;

use super::auth::AuthResolver;

#[derive(Debug, Default)]
pub struct StorageFacade;

#[derive(Debug, Default)]
pub struct ChatMetrics;

pub struct AppChatPoolResolver {
    registry: Arc<PostgresClientRegistry>,
}

impl AppChatPoolResolver {
    pub fn new(registry: Arc<PostgresClientRegistry>) -> Self {
        Self { registry }
    }
}

#[async_trait]
impl ChatPoolResolver for AppChatPoolResolver {
    async fn resolve_pool(&self, client_name: &str) -> ChatResult<PgPool> {
        self.registry.get_pool(client_name).ok_or_else(|| {
            ChatError::Unavailable(format!(
                "chat storage pool for client '{}' is not connected",
                client_name
            ))
        })
    }

    async fn list_pools(&self) -> ChatResult<Vec<(String, PgPool)>> {
        Ok(self
            .registry
            .list_registered_clients()
            .into_iter()
            .filter_map(|client| {
                self.registry
                    .get_pool(&client.client_name)
                    .map(|pool| (client.client_name, pool))
            })
            .collect())
    }
}

pub struct WsHubPublisher {
    hub: Arc<athena_wss::WsHub>,
}

impl WsHubPublisher {
    pub fn new(hub: Arc<athena_wss::WsHub>) -> Self {
        Self { hub }
    }
}

#[async_trait]
impl ChatEventPublisher for WsHubPublisher {
    async fn publish_room_event(
        &self,
        client_name: &str,
        room_id: RoomId,
        event: ChatEventEnvelope,
    ) {
        self.hub
            .publish_room_event(client_name, room_id, event)
            .await;
    }
}

pub fn spawn_chat_outbox_worker(state: Data<AppState>) {
    let app = state.chat_app.clone();
    tokio::spawn(async move {
        info!("Chat outbox worker started");
        loop {
            match app.publish_pending_outbox(100).await {
                Ok(_) => tokio::time::sleep(Duration::from_millis(750)).await,
                Err(err) => {
                    warn!(error = %err, "chat outbox worker publish failed");
                    tokio::time::sleep(Duration::from_secs(2)).await;
                }
            }
        }
    });
}