roboticus-api 0.11.3

HTTP routes, WebSocket, auth, rate limiting, and dashboard for the Roboticus agent runtime
Documentation
//! Generic channel polling loop and platform-specific wrappers.

use std::sync::Arc;
use std::time::Duration;

use roboticus_channels::ChannelAdapter;

use super::AppState;

/// Maximum concurrent channel messages processed per platform.
const CHANNEL_CONCURRENCY: usize = 8;

pub(crate) const CHANNEL_PROCESSING_ERROR_REPLY: &str =
    "I hit an internal processing error while handling that message. Please retry in a moment.";

/// Configuration for channel-specific poll loop behavior.
struct PollConfig {
    /// Platform name for logging and router tracking.
    platform: &'static str,
    /// Sleep duration when recv() returns Ok(None) (no message available).
    idle_sleep: Duration,
    /// Whether to send an error reply to the user on processing failure.
    reply_on_error: bool,
}

/// Generic poll loop for any `ChannelAdapter`. Handles recv → spawn → process
/// with semaphore-bounded concurrency and error recovery.
async fn channel_poll_loop(
    state: AppState,
    adapter: Arc<dyn ChannelAdapter>,
    cfg: PollConfig,
    semaphore: &'static std::sync::LazyLock<Arc<tokio::sync::Semaphore>>,
) {
    tracing::info!(platform = cfg.platform, "inbound poll loop started");

    loop {
        match adapter.recv().await {
            Ok(Some(inbound)) => {
                state.channel_router.record_received(cfg.platform).await;
                let state = state.clone();
                let semaphore = Arc::clone(semaphore);
                let platform = cfg.platform;
                let reply_on_error = cfg.reply_on_error;
                let inbound_for_error = inbound.clone();
                tokio::spawn(async move {
                    let _permit = match semaphore.acquire_owned().await {
                        Ok(p) => p,
                        Err(_) => return,
                    };
                    if let Err(e) = super::process_channel_message(&state, inbound).await {
                        state
                            .channel_router
                            .record_processing_error(platform, e.clone())
                            .await;
                        if reply_on_error {
                            let chat_id = super::resolve_channel_chat_id(&inbound_for_error);
                            if let Err(send_err) = state
                                .channel_router
                                .send_reply(
                                    platform,
                                    &chat_id,
                                    CHANNEL_PROCESSING_ERROR_REPLY.to_string(),
                                )
                                .await
                            {
                                tracing::warn!(
                                    error = %send_err,
                                    platform,
                                    "failed to send processing failure reply"
                                );
                            }
                        }
                        tracing::error!(error = %e, platform, "message processing failed");
                    }
                });
            }
            Ok(None) => {
                if !cfg.idle_sleep.is_zero() {
                    tokio::time::sleep(cfg.idle_sleep).await;
                }
            }
            Err(e) => {
                tracing::error!(
                    error = %e,
                    platform = cfg.platform,
                    "poll error, backing off 5s"
                );
                tokio::time::sleep(Duration::from_secs(5)).await;
            }
        }
    }
}

pub async fn telegram_poll_loop(state: AppState) {
    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));

    let adapter = match &state.telegram {
        Some(a) => a.clone(),
        None => return,
    };

    // Telegram uses the generic loop with error replies enabled.
    // Auth-failure escalation is handled in the adapter's recv() implementation
    // via HTTP 401/404 detection.
    channel_poll_loop(
        state,
        adapter,
        PollConfig {
            platform: "telegram",
            idle_sleep: Duration::ZERO,
            reply_on_error: true,
        },
        &SEMAPHORE,
    )
    .await;
}

pub async fn discord_poll_loop(state: AppState) {
    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));

    let adapter = match &state.discord {
        Some(a) => a.clone(),
        None => return,
    };

    channel_poll_loop(
        state,
        adapter,
        PollConfig {
            platform: "discord",
            idle_sleep: Duration::from_millis(300),
            reply_on_error: false,
        },
        &SEMAPHORE,
    )
    .await;
}

pub async fn signal_poll_loop(state: AppState) {
    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));

    let adapter = match &state.signal {
        Some(a) => a.clone(),
        None => return,
    };

    channel_poll_loop(
        state,
        adapter,
        PollConfig {
            platform: "signal",
            idle_sleep: Duration::from_millis(300),
            reply_on_error: false,
        },
        &SEMAPHORE,
    )
    .await;
}

pub async fn email_poll_loop(state: AppState) {
    static SEMAPHORE: std::sync::LazyLock<Arc<tokio::sync::Semaphore>> =
        std::sync::LazyLock::new(|| Arc::new(tokio::sync::Semaphore::new(CHANNEL_CONCURRENCY)));

    let adapter = match &state.email {
        Some(a) => a.clone(),
        None => return,
    };

    channel_poll_loop(
        state,
        adapter,
        PollConfig {
            platform: "email",
            idle_sleep: Duration::from_secs(1),
            reply_on_error: false,
        },
        &SEMAPHORE,
    )
    .await;
}