Skip to main content

rain_engine_channels/
slack.rs

1//! Slack adapter using incoming/outgoing webhooks and Events API.
2//!
3//! Requires `SLACK_BOT_TOKEN` and `SLACK_SIGNING_SECRET`.
4//! This adapter exposes an HTTP endpoint for Slack to POST events to,
5//! and uses the Web API to send replies.
6
7use crate::{ChannelAdapter, ChannelConfig};
8use async_trait::async_trait;
9use axum::{Json, Router, extract::State, http::StatusCode, routing::post};
10use rain_engine_client::RainEngineClient;
11use serde::Deserialize;
12use serde_json::json;
13use std::sync::Arc;
14use tracing::{error, info, warn};
15
16#[derive(Debug, Clone)]
17pub struct SlackAdapter {
18    bot_token: String,
19    #[allow(dead_code)]
20    signing_secret: String,
21    client: reqwest::Client,
22    engine_client: RainEngineClient,
23    config: ChannelConfig,
24    /// Port to listen on for Slack Events API.
25    listen_port: u16,
26}
27
28impl SlackAdapter {
29    pub fn new(
30        bot_token: String,
31        signing_secret: String,
32        listen_port: u16,
33        config: ChannelConfig,
34    ) -> Self {
35        Self {
36            engine_client: RainEngineClient::new(&config.runtime_url)
37                .expect("failed to init client"),
38            client: reqwest::Client::new(),
39            bot_token,
40            signing_secret,
41            listen_port,
42            config,
43        }
44    }
45
46    fn session_id(&self, channel: &str) -> String {
47        format!("{}-slack-{}", self.config.default_session_prefix, channel)
48    }
49
50    async fn send_message(&self, channel: &str, text: &str) -> Result<(), reqwest::Error> {
51        self.client
52            .post("https://slack.com/api/chat.postMessage")
53            .header("Authorization", format!("Bearer {}", self.bot_token))
54            .json(&serde_json::json!({
55                "channel": channel,
56                "text": text,
57            }))
58            .send()
59            .await?;
60        Ok(())
61    }
62
63    pub async fn handle_event_message(&self, channel: &str, user: &str, text: &str) {
64        let actor_id = format!("slack:{user}");
65        let session_id = self.session_id(channel);
66
67        info!(channel, actor = %actor_id, "Slack message received");
68
69        match self
70            .engine_client
71            .send_human_input(&actor_id, &session_id, text)
72            .await
73        {
74            Ok(result) => {
75                let reply = result
76                    .outcome
77                    .response
78                    .as_deref()
79                    .unwrap_or("_(no response)_");
80                if let Err(err) = self.send_message(channel, reply).await {
81                    error!("Failed to send Slack reply: {err}");
82                }
83            }
84            Err(err) => {
85                error!("Engine request failed: {err}");
86                let _ = self
87                    .send_message(channel, "⚠️ Engine error, please try again.")
88                    .await;
89            }
90        }
91    }
92}
93
94#[derive(Debug, Deserialize)]
95#[allow(dead_code)]
96struct SlackEventPayload {
97    #[serde(default)]
98    r#type: String,
99    #[serde(default)]
100    challenge: Option<String>,
101    #[serde(default)]
102    event: Option<SlackEvent>,
103}
104
105#[derive(Debug, Deserialize)]
106#[allow(dead_code)]
107struct SlackEvent {
108    r#type: String,
109    #[serde(default)]
110    channel: Option<String>,
111    #[serde(default)]
112    user: Option<String>,
113    #[serde(default)]
114    text: Option<String>,
115    #[serde(default)]
116    bot_id: Option<String>,
117}
118
119#[async_trait]
120impl ChannelAdapter for SlackAdapter {
121    fn name(&self) -> &str {
122        "slack"
123    }
124
125    async fn run(&self, cancel: tokio_util::sync::CancellationToken) {
126        info!(
127            port = self.listen_port,
128            "Slack adapter started — listening for Events API"
129        );
130        warn!("Slack request signature verification is not enforced yet");
131
132        let app = Router::new()
133            .route("/slack/events", post(handle_events))
134            .with_state(Arc::new(self.clone()));
135
136        let listener = match tokio::net::TcpListener::bind(("0.0.0.0", self.listen_port)).await {
137            Ok(listener) => listener,
138            Err(err) => {
139                error!("Slack adapter failed to bind: {err}");
140                return;
141            }
142        };
143
144        if let Err(err) = axum::serve(listener, app)
145            .with_graceful_shutdown(async move { cancel.cancelled().await })
146            .await
147        {
148            error!("Slack adapter listener error: {err}");
149        }
150
151        info!("Slack adapter shutting down");
152    }
153}
154
155async fn handle_events(
156    State(adapter): State<Arc<SlackAdapter>>,
157    Json(payload): Json<SlackEventPayload>,
158) -> Result<Json<serde_json::Value>, StatusCode> {
159    match payload.r#type.as_str() {
160        "url_verification" => {
161            let challenge = payload.challenge.unwrap_or_default();
162            Ok(Json(json!({ "challenge": challenge })))
163        }
164        "event_callback" => {
165            if let Some(event) = payload.event {
166                let is_supported_message =
167                    event.r#type == "message" || event.r#type == "app_mention";
168                if is_supported_message
169                    && let (Some(channel), Some(user), Some(text)) = (
170                        event.channel.as_deref(),
171                        event.user.as_deref(),
172                        event.text.as_deref(),
173                    )
174                    && event.bot_id.is_none()
175                {
176                    adapter.handle_event_message(channel, user, text).await;
177                }
178            }
179            Ok(Json(json!({ "ok": true })))
180        }
181        _ => Ok(Json(json!({ "ok": true }))),
182    }
183}