use anyhow::{Context, Result};
use async_trait::async_trait;
use futures::{SinkExt, StreamExt};
use serde_json::{Value, json};
use tokio::sync::mpsc;
use tokio_tungstenite::tungstenite::Message as WsMessage;
use super::adapter::{ChannelId, IncomingCommand, PlatformAdapter};
use super::commands::parse_remote_command;
type WsStream =
tokio_tungstenite::WebSocketStream<tokio_tungstenite::MaybeTlsStream<tokio::net::TcpStream>>;
type WsSink = futures::stream::SplitSink<WsStream, WsMessage>;
pub struct SlackAdapter {
bot_token: String,
app_token: String,
client: reqwest::Client,
}
impl SlackAdapter {
pub fn new(bot_token: String, app_token: String) -> Self {
Self {
bot_token,
app_token,
client: reqwest::Client::new(),
}
}
async fn api_post(&self, method: &str, body: Value) -> Result<Value> {
let url = format!("https://slack.com/api/{method}");
let resp: Value = self
.client
.post(&url)
.bearer_auth(&self.bot_token)
.json(&body)
.send()
.await
.with_context(|| format!("slack api request failed: {method}"))?
.json()
.await
.with_context(|| format!("slack api response parse failed: {method}"))?;
if resp.get("ok") != Some(&Value::Bool(true)) {
let err = resp["error"].as_str().unwrap_or("unknown");
anyhow::bail!("slack api {method} error: {err}");
}
Ok(resp)
}
fn message_body(channel: &ChannelId) -> Value {
let mut body = json!({ "channel": channel.channel });
if let Some(ref ts) = channel.thread {
body["thread_ts"] = json!(ts);
}
body
}
async fn open_socket_url(&self) -> Result<String> {
let resp: Value = self
.client
.post("https://slack.com/api/apps.connections.open")
.bearer_auth(&self.app_token)
.json(&json!({}))
.send()
.await
.context("apps.connections.open request failed")?
.json()
.await
.context("apps.connections.open parse failed")?;
if resp.get("ok") != Some(&Value::Bool(true)) {
let err = resp["error"].as_str().unwrap_or("unknown");
anyhow::bail!("apps.connections.open error: {err}");
}
resp["url"]
.as_str()
.map(String::from)
.context("apps.connections.open: missing url field")
}
async fn ack_envelope(sink: &mut WsSink, envelope_id: &str) -> Result<()> {
let ack = json!({ "envelope_id": envelope_id }).to_string();
sink.send(WsMessage::Text(ack.into())).await?;
Ok(())
}
fn handle_socket_event(envelope: &Value, command_tx: &mpsc::UnboundedSender<IncomingCommand>) {
let event_type = match envelope["type"].as_str() {
Some(t) => t,
None => return,
};
match event_type {
"hello" => {
tracing::info!("[slack] socket mode connected (hello)");
}
"disconnect" => {
tracing::warn!("[slack] server requested disconnect");
}
"events_api" => {
Self::handle_events_api(envelope, command_tx);
}
"interactive" => {
Self::handle_interactive(envelope, command_tx);
}
"slash_commands" => {
Self::handle_slash_command(envelope, command_tx);
}
other => {
tracing::debug!("[slack] unhandled envelope type: {other}");
}
}
}
fn handle_events_api(envelope: &Value, command_tx: &mpsc::UnboundedSender<IncomingCommand>) {
let event = &envelope["payload"]["event"];
if event["type"].as_str() != Some("message") {
return;
}
if event.get("bot_id").is_some() || event.get("subtype").is_some() {
return;
}
let text = match event["text"].as_str() {
Some(t) if !t.is_empty() => t,
_ => return,
};
let ch = match event["channel"].as_str() {
Some(c) => c,
None => return,
};
let user = event["user"].as_str().unwrap_or("unknown");
let mut channel = ChannelId::new("slack", ch);
if let Some(ts) = event["thread_ts"].as_str() {
channel = channel.with_thread(ts);
}
let command = parse_remote_command(text);
let _ = command_tx.send(IncomingCommand {
channel,
user_id: user.to_string(),
command,
});
}
fn handle_interactive(envelope: &Value, command_tx: &mpsc::UnboundedSender<IncomingCommand>) {
let payload = &envelope["payload"];
let actions = match payload["actions"].as_array() {
Some(a) => a,
None => return,
};
let ch = match payload["channel"]["id"].as_str() {
Some(c) => c,
None => return,
};
let user = payload["user"]["id"].as_str().unwrap_or("unknown");
let mut channel = ChannelId::new("slack", ch);
if let Some(ts) = payload["message"]["thread_ts"].as_str() {
channel = channel.with_thread(ts);
}
for action in actions {
let action_id = match action["action_id"].as_str() {
Some(a) => a,
None => continue,
};
let command = parse_remote_command(action_id);
let _ = command_tx.send(IncomingCommand {
channel: channel.clone(),
user_id: user.to_string(),
command,
});
}
}
fn handle_slash_command(envelope: &Value, command_tx: &mpsc::UnboundedSender<IncomingCommand>) {
let payload = &envelope["payload"];
let command_name = match payload["command"].as_str() {
Some(c) => c,
None => return,
};
let ch = match payload["channel_id"].as_str() {
Some(c) => c,
None => return,
};
let user = payload["user_id"].as_str().unwrap_or("unknown");
let channel = ChannelId::new("slack", ch);
let command = parse_remote_command(command_name);
let _ = command_tx.send(IncomingCommand {
channel,
user_id: user.to_string(),
command,
});
}
}
#[async_trait]
impl PlatformAdapter for SlackAdapter {
fn platform_name(&self) -> &str {
"slack"
}
fn max_message_length(&self) -> usize {
4000
}
async fn register_commands(&self, commands: &[(&str, &str)]) -> Result<()> {
if !commands.is_empty() {
tracing::info!(
"[slack] {} skill(s) available via /help (Slack requires static app-dashboard registration)",
commands.len()
);
}
Ok(())
}
async fn send_message(&self, channel: &ChannelId, text: &str) -> Result<()> {
let mut body = Self::message_body(channel);
body["text"] = json!(text);
self.api_post("chat.postMessage", body).await?;
Ok(())
}
async fn send_long_message(
&self,
channel: &ChannelId,
text: &str,
filename: Option<&str>,
) -> Result<()> {
if text.len() <= self.max_message_length() {
return self.send_message(channel, text).await;
}
let name = filename.unwrap_or("response.txt");
let filetype = if name.ends_with(".md") {
"markdown"
} else {
"text"
};
let form = reqwest::multipart::Form::new()
.text("channels", channel.channel.clone())
.text("content", text.to_string())
.text("filename", name.to_string())
.text("filetype", filetype.to_string());
let resp: Value = self
.client
.post("https://slack.com/api/files.upload")
.bearer_auth(&self.bot_token)
.multipart(form)
.send()
.await?
.json()
.await?;
if resp.get("ok") != Some(&Value::Bool(true)) {
tracing::warn!(
"[slack] files.upload failed ({}), falling back to send_message",
resp["error"].as_str().unwrap_or("unknown")
);
let truncated = if text.len() > self.max_message_length() {
&text[..self.max_message_length()]
} else {
text
};
return self.send_message(channel, truncated).await;
}
Ok(())
}
async fn send_buttons(
&self,
channel: &ChannelId,
text: &str,
buttons: &[(String, String)],
) -> Result<()> {
let button_elements: Vec<Value> = buttons
.iter()
.map(|(label, data)| {
json!({
"type": "button",
"text": { "type": "plain_text", "text": label },
"action_id": data,
})
})
.collect();
let blocks = json!([
{
"type": "section",
"text": { "type": "mrkdwn", "text": text },
},
{
"type": "actions",
"elements": button_elements,
}
]);
let mut body = Self::message_body(channel);
body["text"] = json!(text);
body["blocks"] = blocks;
self.api_post("chat.postMessage", body).await?;
Ok(())
}
async fn edit_message(
&self,
channel: &ChannelId,
message_id: &str,
new_text: &str,
) -> Result<bool> {
let body = json!({
"channel": channel.channel,
"ts": message_id,
"text": new_text,
});
self.api_post("chat.update", body).await?;
Ok(true)
}
async fn run(&self, command_tx: mpsc::UnboundedSender<IncomingCommand>) -> Result<()> {
tracing::info!("[slack] adapter starting with Socket Mode (tokio-tungstenite)");
loop {
match self.run_socket_connection(&command_tx).await {
Ok(()) => {
tracing::info!("[slack] socket closed gracefully, reconnecting...");
}
Err(e) => {
tracing::warn!("[slack] socket error: {e:#}, reconnecting in 5s...");
}
}
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
}
}
}
impl SlackAdapter {
async fn run_socket_connection(
&self,
command_tx: &mpsc::UnboundedSender<IncomingCommand>,
) -> Result<()> {
let url = self.open_socket_url().await?;
tracing::debug!("[slack] connecting to socket mode endpoint");
let (ws_stream, _) = tokio_tungstenite::connect_async(&url)
.await
.context("websocket connect failed")?;
let (mut sink, mut stream) = ws_stream.split();
while let Some(msg) = stream.next().await {
let msg = match msg {
Ok(m) => m,
Err(e) => {
tracing::warn!("[slack] websocket read error: {e}");
break;
}
};
match msg {
WsMessage::Text(text) => {
let envelope: Value = match serde_json::from_str(&text) {
Ok(v) => v,
Err(e) => {
tracing::debug!("[slack] invalid json envelope: {e}");
continue;
}
};
if let Some(id) = envelope["envelope_id"].as_str()
&& let Err(e) = Self::ack_envelope(&mut sink, id).await
{
tracing::warn!("[slack] ack failed: {e}");
break;
}
if envelope["type"].as_str() == Some("disconnect") {
Self::handle_socket_event(&envelope, command_tx);
break;
}
Self::handle_socket_event(&envelope, command_tx);
}
WsMessage::Close(_) => {
tracing::info!("[slack] websocket close frame received");
break;
}
WsMessage::Ping(data) => {
let _ = sink.send(WsMessage::Pong(data)).await;
}
_ => {}
}
}
Ok(())
}
}