use async_trait::async_trait;
use crate::approval_core::ApprovalCore;
use crate::channel::{CancelSignal, ChannelId, InboundChannel, InboundSink};
use crate::messaging_config::{MessagingConfigStore, PairingOutcome};
pub const SLACK_PRINCIPAL: &str = "slack-transport";
pub const APPROVE_ACTION_ID: &str = "approve_request";
pub const DENY_ACTION_ID: &str = "deny_request";
const APPROVE: &str = "approve";
const DENY: &str = "deny";
pub const SLACK_BOT_TOKEN_KEY: &str = "SLACK_BOT_TOKEN";
pub const SLACK_APP_TOKEN_KEY: &str = "SLACK_APP_TOKEN";
pub fn provision_slack_tokens(
store: &car_secrets::SecretStore,
bot_token: &str,
app_token: &str,
) -> Result<SlackTokenRefs, String> {
use car_secrets::{SecretRef, DEFAULT_SERVICE};
store
.put(&SecretRef::new(DEFAULT_SERVICE, SLACK_BOT_TOKEN_KEY), bot_token)
.map_err(|e| format!("store bot token: {e}"))?;
store
.put(&SecretRef::new(DEFAULT_SERVICE, SLACK_APP_TOKEN_KEY), app_token)
.map_err(|e| format!("store app token: {e}"))?;
Ok(SlackTokenRefs {
bot_token_key: SLACK_BOT_TOKEN_KEY.to_string(),
app_token_key: SLACK_APP_TOKEN_KEY.to_string(),
})
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct SlackTokenRefs {
pub bot_token_key: String,
pub app_token_key: String,
}
pub fn fetch_secret_by_ref(
store: &car_secrets::SecretStore,
key: &str,
) -> Result<String, String> {
use car_secrets::{SecretRef, DEFAULT_SERVICE};
store
.get(&SecretRef::new(DEFAULT_SERVICE, key))
.map_err(|e| format!("fetch secret {key}: {e}"))
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum SlackInboundEvent {
ButtonInteraction {
action_id: String,
value: String,
user: String,
},
PairingDm {
user: String,
text: String,
},
Ignore,
}
#[async_trait]
pub trait SlackTransport: Send + Sync {
async fn post_message(
&self,
channel: &str,
text: &str,
approval_id: &str,
) -> Result<String, String>;
async fn next_event(&self) -> Option<SlackInboundEvent>;
}
pub fn build_block_kit_message(text: &str, approval_id: &str) -> serde_json::Value {
serde_json::json!({
"text": text,
"blocks": [
{
"type": "section",
"text": { "type": "mrkdwn", "text": text }
},
{
"type": "actions",
"block_id": "approval_actions",
"elements": [
{
"type": "button",
"action_id": APPROVE_ACTION_ID,
"text": { "type": "plain_text", "text": "Approve", "emoji": true },
"style": "primary",
"value": approval_id
},
{
"type": "button",
"action_id": DENY_ACTION_ID,
"text": { "type": "plain_text", "text": "Deny", "emoji": true },
"style": "danger",
"value": approval_id
}
]
}
]
})
}
pub struct SlackAdapter {
core: ApprovalCore,
config: MessagingConfigStore,
transport: std::sync::Arc<dyn SlackTransport>,
channel: String,
}
impl SlackAdapter {
pub fn new(
host: std::sync::Arc<crate::host::HostState>,
config: MessagingConfigStore,
transport: std::sync::Arc<dyn SlackTransport>,
channel: impl Into<String>,
) -> Self {
Self {
core: ApprovalCore::new(host),
config,
transport,
channel: channel.into(),
}
}
pub async fn post_prompt(&self, action: &str, approval_id: &str, code: &str) {
if !self.config.is_enabled_for(ChannelId::Slack).unwrap_or(false) {
return;
}
if self.channel.is_empty() {
return;
}
let text = slack_prompt_text(action, code);
if let Err(e) = self
.transport
.post_message(&self.channel, &text, approval_id)
.await
{
tracing::warn!(approval_id = %approval_id, error = %e, "slack approval prompt post failed");
}
}
pub async fn handle_event(&self, event: &SlackInboundEvent) {
if !self.config.is_enabled_for(ChannelId::Slack).unwrap_or(false) {
return;
}
match event {
SlackInboundEvent::ButtonInteraction {
action_id,
value,
user,
} => {
self.handle_button(action_id, value, user).await;
}
SlackInboundEvent::PairingDm { user, text } => {
let _outcome: PairingOutcome = self
.config
.validate_and_consume_pairing_code_for(ChannelId::Slack, user, text)
.unwrap_or(PairingOutcome::Rejected);
}
SlackInboundEvent::Ignore => {}
}
}
async fn handle_button(&self, action_id: &str, approval_id: &str, user: &str) {
let resolution = if action_id == APPROVE_ACTION_ID {
APPROVE
} else if action_id == DENY_ACTION_ID {
DENY
} else {
return;
};
if !self
.config
.is_allowlisted_for(ChannelId::Slack, user)
.unwrap_or(false)
{
return;
}
if !self.core.is_id_eligible_pending(approval_id).await {
return;
}
let _ = self
.core
.resolve(SLACK_PRINCIPAL, approval_id, resolution)
.await;
}
pub fn core(&self) -> &ApprovalCore {
&self.core
}
}
#[async_trait]
impl InboundChannel for SlackAdapter {
fn channel(&self) -> ChannelId {
ChannelId::Slack
}
async fn run(&self, _sink: &dyn InboundSink, mut cancel: CancelSignal) {
loop {
tokio::select! {
_ = cancel.changed() => {
if *cancel.borrow() {
break;
}
}
event = self.transport.next_event() => {
match event {
Some(ev) => self.handle_event(&ev).await,
None => break, }
}
}
}
}
}
pub fn slack_prompt_text(action: &str, code: &str) -> String {
format!("*Approval needed:* {action}\n(ref `{code}`) — use the buttons below.")
}
pub struct RealSlackTransport {
secrets: car_secrets::SecretStore,
bot_token_key: String,
app_token_key: String,
http: reqwest::Client,
inbound_rx: tokio::sync::Mutex<tokio::sync::mpsc::Receiver<SlackInboundEvent>>,
inbound_tx: tokio::sync::mpsc::Sender<SlackInboundEvent>,
}
impl RealSlackTransport {
pub fn new(bot_token_key: impl Into<String>, app_token_key: impl Into<String>) -> Self {
let (inbound_tx, inbound_rx) = tokio::sync::mpsc::channel(64);
Self {
secrets: car_secrets::SecretStore::new(),
bot_token_key: bot_token_key.into(),
app_token_key: app_token_key.into(),
http: reqwest::Client::new(),
inbound_rx: tokio::sync::Mutex::new(inbound_rx),
inbound_tx,
}
}
async fn open_socket_url(&self) -> Result<String, String> {
let app_token = fetch_secret_by_ref(&self.secrets, &self.app_token_key)?;
let resp: serde_json::Value = self
.http
.post("https://slack.com/api/apps.connections.open")
.bearer_auth(app_token)
.header("Content-Length", "0")
.send()
.await
.map_err(|e| format!("apps.connections.open: {e}"))?
.json()
.await
.map_err(|e| format!("apps.connections.open decode: {e}"))?;
parse_socket_url_response(&resp)
}
pub fn spawn_socket_loop(self: std::sync::Arc<Self>, mut cancel: CancelSignal) {
tokio::spawn(async move {
const HEALTHY_UP: std::time::Duration = std::time::Duration::from_secs(5);
let mut backoff = std::time::Duration::from_secs(1);
loop {
if *cancel.borrow() {
break;
}
match self.open_socket_url().await {
Ok(url) => {
let started = std::time::Instant::now();
if let Err(e) = self.run_one_connection(&url, &mut cancel).await {
tracing::warn!(error = %e, "slack socket connection ended");
}
if started.elapsed() >= HEALTHY_UP {
backoff = std::time::Duration::from_secs(1);
} else {
tokio::select! {
_ = tokio::time::sleep(backoff) => {}
_ = cancel.changed() => {}
}
backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
}
}
Err(e) => {
tracing::warn!(error = %e, "slack apps.connections.open failed");
tokio::select! {
_ = tokio::time::sleep(backoff) => {}
_ = cancel.changed() => {}
}
backoff = (backoff * 2).min(std::time::Duration::from_secs(30));
}
}
}
});
}
async fn run_one_connection(
&self,
wss_url: &str,
cancel: &mut CancelSignal,
) -> Result<(), String> {
use futures_util::{SinkExt, StreamExt};
use tokio_tungstenite::{connect_async, tungstenite::Message};
let (ws_stream, _resp) = connect_async(wss_url)
.await
.map_err(|e| format!("ws connect: {e}"))?;
let (mut write, mut read) = ws_stream.split();
loop {
tokio::select! {
_ = cancel.changed() => {
if *cancel.borrow() { return Ok(()); }
}
frame = read.next() => {
let Some(frame) = frame else { return Ok(()); };
let msg = frame.map_err(|e| format!("ws read: {e}"))?;
let text = match msg {
Message::Text(t) => t,
Message::Close(_) => return Ok(()),
_ => continue,
};
let envelope: serde_json::Value = match serde_json::from_str(&text) {
Ok(v) => v,
Err(_) => continue,
};
let ev_type = envelope["type"].as_str().unwrap_or("");
if matches!(ev_type, "events_api" | "interactive" | "slash_commands") {
if let Some(ack) = build_ack_frame(&envelope) {
let _ = write.send(Message::Text(ack.to_string().into())).await;
}
}
if ev_type == "disconnect" {
return Ok(()); }
match parse_socket_frame(&envelope) {
ev @ (SlackInboundEvent::ButtonInteraction { .. }
| SlackInboundEvent::PairingDm { .. }) => {
let _ = self.inbound_tx.send(ev).await;
}
SlackInboundEvent::Ignore => {}
}
}
}
}
}
}
pub fn parse_socket_url_response(resp: &serde_json::Value) -> Result<String, String> {
if resp["ok"].as_bool() != Some(true) {
return Err(format!(
"apps.connections.open failed: {}",
resp["error"].as_str().unwrap_or("unknown")
));
}
resp["url"]
.as_str()
.filter(|u| !u.is_empty())
.map(|u| u.to_string())
.ok_or_else(|| "apps.connections.open returned ok with no url".to_string())
}
pub fn build_ack_frame(envelope: &serde_json::Value) -> Option<serde_json::Value> {
let eid = envelope["envelope_id"].as_str()?;
Some(serde_json::json!({ "envelope_id": eid }))
}
pub fn parse_socket_frame(envelope: &serde_json::Value) -> SlackInboundEvent {
match envelope["type"].as_str() {
Some("interactive") => {
parse_interactive(&envelope["payload"]).unwrap_or(SlackInboundEvent::Ignore)
}
Some("events_api") => {
parse_events_api(&envelope["payload"]).unwrap_or(SlackInboundEvent::Ignore)
}
_ => SlackInboundEvent::Ignore,
}
}
pub fn parse_interactive(payload: &serde_json::Value) -> Option<SlackInboundEvent> {
if payload["type"].as_str()? != "block_actions" {
return None;
}
let action = payload["actions"].as_array()?.first()?;
let action_id = action["action_id"].as_str()?.to_string();
if action_id != APPROVE_ACTION_ID && action_id != DENY_ACTION_ID {
return None;
}
let value = action["value"].as_str()?.to_string();
let user = payload["user"]["id"].as_str().unwrap_or("").to_string();
Some(SlackInboundEvent::ButtonInteraction {
action_id,
value,
user,
})
}
pub fn parse_events_api(payload: &serde_json::Value) -> Option<SlackInboundEvent> {
let event = &payload["event"];
if event["type"].as_str()? != "message" {
return None;
}
if event["channel_type"].as_str() != Some("im") {
return None; }
if event["subtype"].as_str() == Some("bot_message") || event.get("bot_id").is_some() {
return None;
}
let user = event["user"].as_str()?.to_string();
let text = event["text"].as_str().unwrap_or("").trim().to_string();
if user.is_empty() || text.is_empty() {
return None;
}
Some(SlackInboundEvent::PairingDm { user, text })
}
#[async_trait]
impl SlackTransport for RealSlackTransport {
async fn post_message(
&self,
channel: &str,
text: &str,
approval_id: &str,
) -> Result<String, String> {
let bot_token = fetch_secret_by_ref(&self.secrets, &self.bot_token_key)?;
let mut body = build_block_kit_message(text, approval_id);
body["channel"] = serde_json::Value::String(channel.to_string());
let resp: serde_json::Value = self
.http
.post("https://slack.com/api/chat.postMessage")
.bearer_auth(bot_token)
.json(&body)
.send()
.await
.map_err(|e| format!("chat.postMessage: {e}"))?
.json()
.await
.map_err(|e| format!("chat.postMessage decode: {e}"))?;
if resp["ok"].as_bool() != Some(true) {
return Err(format!(
"chat.postMessage failed: {}",
resp["error"].as_str().unwrap_or("unknown")
));
}
Ok(resp["ts"].as_str().unwrap_or("").to_string())
}
async fn next_event(&self) -> Option<SlackInboundEvent> {
self.inbound_rx.lock().await.recv().await
}
}