1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
//! RTM (Real-Time Messaging) API
//!
//! Methods for real-time messaging via WebSocket.
use crate::client::SlackClient;
use crate::error::{Result, SlackError};
use crate::types::RtmConnectResponse;
use futures_util::{SinkExt, StreamExt};
use serde::{Deserialize, Serialize};
use tokio_tungstenite::{connect_async, tungstenite::Message as WsMessage};
use tracing::{debug, error, info, warn};
/// RTM API client
pub struct RtmApi {
client: SlackClient,
}
/// RTM event types
#[derive(Debug, Deserialize, Serialize)]
pub struct RtmEvent {
#[serde(rename = "type")]
pub event_type: String,
#[serde(flatten)]
pub data: serde_json::Value,
}
/// RTM message event
#[derive(Debug, Deserialize)]
pub struct RtmMessageEvent {
#[serde(rename = "type")]
pub event_type: String,
pub channel: Option<String>,
pub user: Option<String>,
pub text: Option<String>,
pub ts: Option<String>,
pub thread_ts: Option<String>,
pub bot_id: Option<String>,
}
/// Callback type for RTM message handlers
pub type MessageHandler = Box<dyn Fn(RtmMessageEvent) + Send + Sync>;
impl RtmApi {
pub(crate) fn new(client: SlackClient) -> Self {
Self { client }
}
/// Connect to the RTM API
///
/// Returns the WebSocket URL and self information
pub async fn connect(&self) -> Result<RtmConnectResponse> {
let params: [(&str, &str); 0] = [];
self.client.get("rtm.connect", ¶ms).await
}
/// Start an RTM connection and listen for events
///
/// # Arguments
///
/// * `on_message` - Callback function for message events
///
/// # Example
///
/// ```no_run
/// # use slacko::{SlackClient, AuthConfig};
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
/// # let client = SlackClient::new(AuthConfig::oauth("token"))?;
/// client.rtm().start(|msg| {
/// println!("Received: {:?}", msg.text);
/// }).await?;
/// # Ok(())
/// # }
/// ```
pub async fn start<F>(&self, on_message: F) -> Result<()>
where
F: Fn(RtmMessageEvent) + Send + Sync + 'static,
{
let rtm_info = self.connect().await?;
let ws_url = &rtm_info.url;
let bot_id = rtm_info.self_info.id.clone();
info!("Connecting to RTM WebSocket: {}", ws_url);
let (ws_stream, _) = connect_async(ws_url)
.await
.map_err(|e| SlackError::websocket_error(format!("Failed to connect: {}", e)))?;
info!("RTM WebSocket connected");
let (mut write, mut read) = ws_stream.split();
// Message processing loop
while let Some(msg) = read.next().await {
match msg {
Ok(WsMessage::Text(text)) => {
debug!("RTM received: {}", text);
if let Ok(event) = serde_json::from_str::<RtmMessageEvent>(&text) {
// Only process message events
if event.event_type == "message" {
// Skip messages from ourselves
if event.bot_id.as_ref() == Some(&bot_id) {
continue;
}
// Call the message handler
on_message(event);
}
}
}
Ok(WsMessage::Close(_)) => {
warn!("RTM WebSocket closed");
break;
}
Ok(WsMessage::Ping(data)) => {
debug!("RTM received ping");
if let Err(e) = write.send(WsMessage::Pong(data)).await {
error!("Failed to send pong: {}", e);
}
}
Ok(_) => {
debug!("RTM received other message type");
}
Err(e) => {
error!("RTM WebSocket error: {}", e);
return Err(SlackError::websocket_error(format!(
"WebSocket error: {}",
e
)));
}
}
}
warn!("RTM connection closed");
Ok(())
}
/// Start RTM and filter messages by channel
///
/// # Arguments
///
/// * `channel` - Channel ID to filter messages
/// * `on_message` - Callback function for message events
pub async fn start_with_channel<F>(&self, channel: &str, on_message: F) -> Result<()>
where
F: Fn(RtmMessageEvent) + Send + Sync + 'static,
{
let channel_filter = channel.to_string();
self.start(move |event| {
if event.channel.as_deref() == Some(&channel_filter) {
on_message(event);
}
})
.await
}
}