opencrabs 0.3.56

The autonomous, self-improving AI agent. Single Rust binary. Every channel. Install with: cargo install opencrabs
Documentation
//! Channel Manager
//!
//! Manages the lifecycle of channel agents (Telegram, WhatsApp, Discord, Slack, Trello).
//! Spawns and stops channels dynamically when the config changes at runtime,
//! so that toggling `channels.*.enabled` in config.toml takes effect without restart.

use std::collections::HashMap;
use std::sync::Arc;

use tokio::task::JoinHandle;

use crate::channels::ChannelFactory;
use crate::config::Config;

/// What reconcile should do with a channel this pass.
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub(crate) enum ChannelAction {
    /// Not running (or its agent task died) and it should be: (re)start it.
    Start,
    /// Running but it should not be: stop it.
    Stop,
    /// Desired state already matches: do nothing.
    Noop,
}

/// Decide what to do with a channel from its desired state and whether its
/// agent task is still ALIVE.
///
/// The key case (issues #239/#240): when a channel agent crashes or exits, its
/// `JoinHandle` lingers in the map. Keying "running" off `contains_key` then
/// treats a dead agent as running, so an enabled channel is never restarted and
/// the WhatsApp pairing QR never reappears. Treating a finished handle as
/// not-alive yields `Start`, so a dead agent auto-restarts.
pub(crate) fn channel_action(should_run: bool, alive: bool) -> ChannelAction {
    match (should_run, alive) {
        (true, false) => ChannelAction::Start,
        (false, true) => ChannelAction::Stop,
        _ => ChannelAction::Noop,
    }
}

/// A channel counts as running only while its agent task is still alive; a
/// finished handle is stale (the agent exited) and must not block a restart.
fn handle_alive(handles: &HashMap<String, JoinHandle<()>>, name: &str) -> bool {
    handles.get(name).is_some_and(|h| !h.is_finished())
}

/// Manages running channel agents, allowing dynamic spawn/stop on config reload.
pub struct ChannelManager {
    handles: tokio::sync::Mutex<HashMap<String, JoinHandle<()>>>,
    channel_factory: Arc<ChannelFactory>,
    db_pool: deadpool_sqlite::Pool,

    #[cfg(feature = "telegram")]
    telegram_state: Arc<crate::channels::telegram::TelegramState>,
    #[cfg(feature = "whatsapp")]
    whatsapp_state: Arc<crate::channels::whatsapp::WhatsAppState>,
    #[cfg(feature = "discord")]
    discord_state: Arc<crate::channels::discord::DiscordState>,
    #[cfg(feature = "slack")]
    slack_state: Arc<crate::channels::slack::SlackState>,
    #[cfg(feature = "trello")]
    trello_state: Arc<crate::channels::trello::TrelloState>,
}

impl ChannelManager {
    #[allow(clippy::too_many_arguments)]
    pub fn new(
        channel_factory: Arc<ChannelFactory>,
        db_pool: deadpool_sqlite::Pool,
        #[cfg(feature = "telegram")] telegram_state: Arc<crate::channels::telegram::TelegramState>,
        #[cfg(feature = "whatsapp")] whatsapp_state: Arc<crate::channels::whatsapp::WhatsAppState>,
        #[cfg(feature = "discord")] discord_state: Arc<crate::channels::discord::DiscordState>,
        #[cfg(feature = "slack")] slack_state: Arc<crate::channels::slack::SlackState>,
        #[cfg(feature = "trello")] trello_state: Arc<crate::channels::trello::TrelloState>,
    ) -> Self {
        Self {
            handles: tokio::sync::Mutex::new(HashMap::new()),
            channel_factory,
            db_pool,
            #[cfg(feature = "telegram")]
            telegram_state,
            #[cfg(feature = "whatsapp")]
            whatsapp_state,
            #[cfg(feature = "discord")]
            discord_state,
            #[cfg(feature = "slack")]
            slack_state,
            #[cfg(feature = "trello")]
            trello_state,
        }
    }

    /// Compare running channels against config and spawn/stop as needed.
    pub async fn reconcile(&self, config: &Config) {
        let mut handles = self.handles.lock().await;

        #[cfg(feature = "telegram")]
        self.reconcile_telegram(config, &mut handles).await;

        #[cfg(feature = "whatsapp")]
        self.reconcile_whatsapp(config, &mut handles).await;

        #[cfg(feature = "discord")]
        self.reconcile_discord(config, &mut handles).await;

        #[cfg(feature = "slack")]
        self.reconcile_slack(config, &mut handles).await;

        #[cfg(feature = "trello")]
        self.reconcile_trello(config, &mut handles).await;
    }

    #[cfg(feature = "telegram")]
    async fn reconcile_telegram(
        &self,
        config: &Config,
        handles: &mut HashMap<String, JoinHandle<()>>,
    ) {
        let tg = &config.channels.telegram;
        let has_valid_token = tg
            .token
            .as_ref()
            .map(|t| {
                if t.is_empty() || !t.contains(':') {
                    return false;
                }
                let parts: Vec<&str> = t.splitn(2, ':').collect();
                parts.len() == 2 && parts[0].parse::<u64>().is_ok() && parts[1].len() >= 30
            })
            .unwrap_or(false);

        let should_run = tg.enabled && has_valid_token;
        match channel_action(should_run, handle_alive(handles, "telegram")) {
            ChannelAction::Start => {
                if let Some(ref token) = tg.token {
                    let token_hash = crate::config::profile::hash_token(token);
                    if let Err(e) =
                        crate::config::profile::acquire_token_lock("telegram", &token_hash)
                    {
                        tracing::warn!("ChannelManager: Telegram token lock denied — {}", e);
                        return;
                    }
                    let agent = crate::channels::telegram::TelegramAgent::new(
                        self.channel_factory.create_agent_service().await,
                        self.channel_factory.service_context(),
                        self.channel_factory.shared_session_id(),
                        self.telegram_state.clone(),
                        self.channel_factory.config_rx(),
                        crate::db::ChannelMessageRepository::new(self.db_pool.clone()),
                    );
                    tracing::info!(
                        "ChannelManager: spawning Telegram bot ({} allowed users)",
                        tg.allowed_users.len()
                    );
                    // Overwrites any stale finished handle.
                    handles.insert("telegram".to_string(), agent.start(token.clone()));
                }
            }
            ChannelAction::Stop => {
                if let Some(handle) = handles.remove("telegram") {
                    tracing::info!("ChannelManager: stopping Telegram bot");
                    handle.abort();
                }
            }
            ChannelAction::Noop => {}
        }
    }

    #[cfg(feature = "whatsapp")]
    async fn reconcile_whatsapp(
        &self,
        config: &Config,
        handles: &mut HashMap<String, JoinHandle<()>>,
    ) {
        let wa = &config.channels.whatsapp;
        let should_run = wa.enabled;
        // A pairing reset wipes session.db and asks for a restart. Abort the
        // live agent so it starts fresh against the wiped session (drops old
        // auth at runtime); the Start arm below then respawns it.
        if should_run
            && self.whatsapp_state.take_restart_request()
            && let Some(handle) = handles.remove("whatsapp")
        {
            tracing::info!("ChannelManager: restarting WhatsApp agent for re-pairing");
            // Cleanly disconnect the live client BEFORE aborting the task.
            // Aborting the JoinHandle only drops the `bot.run()` future;
            // whatsapp-rust runs its keepalive and read loop on independent
            // detached tasks, so the old socket keeps pinging and lingers as a
            // second companion alongside the freshly-paired one. Two companions
            // on one session make WhatsApp drop a socket, so inbound messages
            // land on an orphaned connection and never get a reply.
            // `disconnect()` tears the transport down and disables
            // auto-reconnect so it cannot resurrect.
            if let Some(client) = self.whatsapp_state.client().await {
                client.disconnect().await;
            }
            handle.abort();
        }
        match channel_action(should_run, handle_alive(handles, "whatsapp")) {
            ChannelAction::Start => {
                let agent = crate::channels::whatsapp::WhatsAppAgent::new(
                    self.channel_factory.create_agent_service().await,
                    self.channel_factory.service_context(),
                    self.channel_factory.shared_session_id(),
                    self.whatsapp_state.clone(),
                    self.channel_factory.config_rx(),
                    crate::db::ChannelMessageRepository::new(self.db_pool.clone()),
                );
                tracing::info!(
                    "ChannelManager: spawning WhatsApp agent ({} allowed phones)",
                    wa.allowed_phones.len()
                );
                // Overwrites any stale finished handle (dead agent auto-restarts).
                handles.insert("whatsapp".to_string(), agent.start());
            }
            ChannelAction::Stop => {
                if let Some(handle) = handles.remove("whatsapp") {
                    tracing::info!("ChannelManager: stopping WhatsApp agent");
                    // Same as the restart path: disconnect the live client so
                    // the socket does not linger after the task is aborted.
                    if let Some(client) = self.whatsapp_state.client().await {
                        client.disconnect().await;
                    }
                    handle.abort();
                }
            }
            ChannelAction::Noop => {}
        }
    }

    #[cfg(feature = "discord")]
    async fn reconcile_discord(
        &self,
        config: &Config,
        handles: &mut HashMap<String, JoinHandle<()>>,
    ) {
        let dc = &config.channels.discord;
        let has_valid_token = dc
            .token
            .as_ref()
            .map(|t| !t.is_empty() && t.len() > 50)
            .unwrap_or(false);
        let should_run = dc.enabled && has_valid_token;
        match channel_action(should_run, handle_alive(handles, "discord")) {
            ChannelAction::Start => {
                if let Some(ref token) = dc.token {
                    let token_hash = crate::config::profile::hash_token(token);
                    if let Err(e) =
                        crate::config::profile::acquire_token_lock("discord", &token_hash)
                    {
                        tracing::warn!("ChannelManager: Discord token lock denied — {}", e);
                        return;
                    }
                    let agent = crate::channels::discord::DiscordAgent::new(
                        self.channel_factory.create_agent_service().await,
                        self.channel_factory.service_context(),
                        self.channel_factory.shared_session_id(),
                        self.discord_state.clone(),
                        self.channel_factory.config_rx(),
                        crate::db::ChannelMessageRepository::new(self.db_pool.clone()),
                    );
                    tracing::info!(
                        "ChannelManager: spawning Discord bot ({} allowed users)",
                        dc.allowed_users.len()
                    );
                    handles.insert("discord".to_string(), agent.start(token.clone()));
                }
            }
            ChannelAction::Stop => {
                if let Some(handle) = handles.remove("discord") {
                    tracing::info!("ChannelManager: stopping Discord bot");
                    handle.abort();
                }
            }
            ChannelAction::Noop => {}
        }
    }

    #[cfg(feature = "slack")]
    async fn reconcile_slack(
        &self,
        config: &Config,
        handles: &mut HashMap<String, JoinHandle<()>>,
    ) {
        let sl = &config.channels.slack;
        let has_valid_tokens = sl
            .token
            .as_ref()
            .map(|t| !t.is_empty() && t.starts_with("xoxb-"))
            .unwrap_or(false)
            && sl
                .app_token
                .as_ref()
                .map(|t| !t.is_empty() && t.starts_with("xapp-"))
                .unwrap_or(false);
        let should_run = sl.enabled && has_valid_tokens;
        match channel_action(should_run, handle_alive(handles, "slack")) {
            ChannelAction::Start => {
                if let (Some(bot_tok), Some(app_tok)) = (sl.token.clone(), sl.app_token.clone()) {
                    let token_hash = crate::config::profile::hash_token(&bot_tok);
                    if let Err(e) = crate::config::profile::acquire_token_lock("slack", &token_hash)
                    {
                        tracing::warn!("ChannelManager: Slack token lock denied — {}", e);
                        return;
                    }
                    let agent = crate::channels::slack::SlackAgent::new(
                        self.channel_factory.create_agent_service().await,
                        self.channel_factory.service_context(),
                        self.channel_factory.shared_session_id(),
                        self.slack_state.clone(),
                        self.channel_factory.config_rx(),
                        crate::db::ChannelMessageRepository::new(self.db_pool.clone()),
                    );
                    tracing::info!(
                        "ChannelManager: spawning Slack bot ({} allowed users)",
                        sl.allowed_users.len()
                    );
                    handles.insert("slack".to_string(), agent.start(bot_tok, app_tok));
                }
            }
            ChannelAction::Stop => {
                if let Some(handle) = handles.remove("slack") {
                    tracing::info!("ChannelManager: stopping Slack bot");
                    handle.abort();
                }
            }
            ChannelAction::Noop => {}
        }
    }

    #[cfg(feature = "trello")]
    async fn reconcile_trello(
        &self,
        config: &Config,
        handles: &mut HashMap<String, JoinHandle<()>>,
    ) {
        let tr = &config.channels.trello;
        let has_valid_creds = tr
            .app_token
            .as_ref()
            .map(|k| !k.is_empty())
            .unwrap_or(false)
            && tr.token.as_ref().map(|t| !t.is_empty()).unwrap_or(false);
        let has_boards = !tr.board_ids.is_empty();
        let should_run = tr.enabled && has_valid_creds && has_boards;
        match channel_action(should_run, handle_alive(handles, "trello")) {
            ChannelAction::Start => {
                if let (Some(api_key), Some(api_token)) = (tr.app_token.clone(), tr.token.clone()) {
                    let token_hash = crate::config::profile::hash_token(&api_token);
                    if let Err(e) =
                        crate::config::profile::acquire_token_lock("trello", &token_hash)
                    {
                        tracing::warn!("ChannelManager: Trello token lock denied — {}", e);
                        return;
                    }
                    let agent = crate::channels::trello::TrelloAgent::new(
                        self.channel_factory.create_agent_service().await,
                        self.channel_factory.service_context(),
                        tr.allowed_users.clone(),
                        self.channel_factory.shared_session_id(),
                        self.trello_state.clone(),
                        tr.board_ids.clone(),
                        tr.poll_interval_secs,
                        tr.session_idle_hours,
                    );
                    tracing::info!(
                        "ChannelManager: spawning Trello agent ({} boards)",
                        tr.board_ids.len()
                    );
                    handles.insert("trello".to_string(), agent.start(api_key, api_token));
                }
            }
            ChannelAction::Stop => {
                if let Some(handle) = handles.remove("trello") {
                    tracing::info!("ChannelManager: stopping Trello agent");
                    handle.abort();
                }
            }
            ChannelAction::Noop => {}
        }
    }
}