use crate::{
bot::Bot,
context::Context,
error::FrameworkResult,
message::MessageElement,
types::{Channel, Guild, GuildMember, GuildRole, Login, LoginStatus, Message, User},
};
use async_trait::async_trait;
use futures_util::StreamExt;
use serde::Deserialize;
use std::sync::Arc;
use tokio::net::TcpStream;
use tokio_tungstenite::{MaybeTlsStream, WebSocketStream};
use url::Url;
#[async_trait]
pub trait Adapter: Send + Sync + std::fmt::Debug {
fn get_name(&self) -> String;
async fn connect(&self, bot: Arc<Bot>);
async fn disconnect(&self, bot: Arc<Bot>);
async fn create_reaction(
&self,
message_id: &str,
channel_id: &str,
emoji: &str,
) -> FrameworkResult<()>;
async fn delete_reaction(
&self,
message_id: &str,
channel_id: &str,
emoji: &str,
user_id: &str,
) -> FrameworkResult<()>;
async fn clear_reaction(
&self,
message_id: &str,
channel_id: &str,
emoji: &str,
) -> FrameworkResult<()>;
async fn get_reaction_list(
&self,
message_id: &str,
channel_id: &str,
emoji: &str,
next: Option<&str>,
) -> FrameworkResult<Vec<User>>;
async fn get_channel(&self, channel_id: &str) -> FrameworkResult<Channel>;
async fn get_channel_list(
&self,
guild_id: &str,
next: Option<&str>,
) -> FrameworkResult<Vec<Channel>>;
async fn create_channel(&self, guild_id: &str, data: Channel) -> FrameworkResult<Channel>;
async fn update_channel(&self, channel_id: &str, data: Channel) -> FrameworkResult<()>;
async fn delete_channel(&self, channel_id: &str) -> FrameworkResult<()>;
async fn create_direct_channel(&self, user_id: &str) -> FrameworkResult<Channel>;
async fn set_guild_member_role(
&self,
guild_id: &str,
user_id: &str,
role_id: &str,
) -> FrameworkResult<()>;
async fn unset_guild_member_role(
&self,
guild_id: &str,
user_id: &str,
role_id: &str,
) -> FrameworkResult<()>;
async fn get_guild_member_role_list(
&self,
guild_id: &str,
next: Option<&str>,
) -> FrameworkResult<Vec<GuildRole>>;
async fn create_guild_role(
&self,
guild_id: &str,
role_name: &str,
) -> FrameworkResult<GuildRole>;
async fn update_guild_role(
&self,
guild_id: &str,
role_id: &str,
role: GuildRole,
) -> FrameworkResult<()>;
async fn delete_guild_role(&self, guild_id: &str, role_id: &str) -> FrameworkResult<()>;
async fn send_message(
&self,
channel_id: &str,
elements: &[MessageElement],
) -> FrameworkResult<Vec<String>>;
async fn send_private_message(
&self,
user_id: &str,
guild_id: &str,
elements: &[MessageElement],
) -> FrameworkResult<Vec<String>>;
async fn get_message(&self, channel_id: &str, message_id: &str) -> FrameworkResult<Message>;
async fn delete_message(&self, channel_id: &str, message_id: &str) -> FrameworkResult<()>;
async fn update_message(
&self,
channel_id: &str,
message_id: &str,
elements: &[MessageElement],
) -> FrameworkResult<()>;
async fn get_message_list(
&self,
channel_id: &str,
next: Option<&str>,
directory: Option<&str>,
) -> FrameworkResult<Vec<Message>>;
async fn get_user(&self, user_id: &str) -> FrameworkResult<User>;
async fn get_friends(&self, next: Option<&str>) -> FrameworkResult<Vec<User>>;
async fn handle_friend_request(
&self,
message_id: &str,
accept: bool,
comment: Option<&str>,
) -> FrameworkResult<()>;
async fn get_guild(&self, guild_id: &str) -> FrameworkResult<Guild>;
async fn get_guilds(&self, next: Option<&str>) -> FrameworkResult<Vec<Guild>>;
async fn handle_guild_invite(
&self,
message_id: &str,
accept: bool,
comment: Option<&str>,
) -> FrameworkResult<()>;
async fn get_guild_member(&self, guild_id: &str, user_id: &str)
-> FrameworkResult<GuildMember>;
async fn get_guild_members(
&self,
guild_id: &str,
next: Option<&str>,
) -> FrameworkResult<Vec<GuildMember>>;
async fn kick_guild_member(
&self,
guild_id: &str,
user_id: &str,
permanent: Option<bool>,
) -> FrameworkResult<()>;
async fn mute_guild_member(
&self,
guild_id: &str,
user_id: &str,
duration: Option<u64>,
reason: &str,
) -> FrameworkResult<()>;
async fn handle_guild_request(
&self,
message_id: &str,
accept: bool,
comment: Option<&str>,
) -> FrameworkResult<()>;
async fn get_login(&self) -> FrameworkResult<Login>;
}
#[derive(Debug, Clone, Deserialize)]
pub struct WSClientConfig<C> {
retry_lazy: u64,
retry_times: u64,
retry_interval: u64,
_extend: Option<C>,
}
#[async_trait]
pub trait WSClient<C>: Adapter
where
C: for<'de> Deserialize<'de> + Send,
{
fn ctx(&self) -> Context;
fn bot(&self) -> Arc<Bot>;
fn socket(&self) -> Option<WebSocketStream<MaybeTlsStream<TcpStream>>>;
fn config(&self) -> WSClientConfig<C>;
async fn prepare(&self) -> FrameworkResult<(WebSocketStream<MaybeTlsStream<TcpStream>>, Url)>;
async fn accept(&self);
fn set_status(&self, status: LoginStatus);
fn get_active(&self) -> bool;
async fn start(&self) {
let mut retry_count = 0;
let ws_config = self.config();
loop {
if !self.get_active() {
tracing::debug!(
"Adapter {} is not active, stopping connection attempts.",
self.get_name()
);
self.set_status(LoginStatus::Offline);
return;
}
tracing::debug!(
"Adapter {} (attempt {}): Trying to connect...",
self.get_name(),
retry_count + 1
);
let mut socket_stream = match self.prepare().await {
Ok((stream, _url)) => {
self.set_status(LoginStatus::Online);
tracing::info!("Adapter {} connected successfully.", self.get_name());
if retry_count > 0 {
retry_count = 0;
}
self.accept().await;
stream
}
Err(e) => {
tracing::warn!(
"Adapter {} failed to prepare connection: {}",
self.get_name(),
e
);
let timeout = if retry_count >= ws_config.retry_times {
if ws_config.retry_lazy == 0 {
tracing::error!(
"Adapter {} reached max retry attempts ({}) and no lazy retry configured. Stopping.",
self.get_name(),
ws_config.retry_times
);
self.set_status(LoginStatus::Offline);
return;
}
if retry_count == ws_config.retry_times {
tracing::warn!(
"Adapter {} reached max retry attempts. Falling back to lazy retry ({}ms).",
self.get_name(),
ws_config.retry_lazy
);
}
ws_config.retry_lazy
} else {
ws_config.retry_interval
};
retry_count += 1;
self.set_status(LoginStatus::Reconnect);
tracing::info!(
"Adapter {} will retry connection in {}ms (attempt {}).",
self.get_name(),
timeout,
retry_count
);
tokio::time::sleep(tokio::time::Duration::from_millis(timeout)).await;
continue;
}
};
tracing::debug!("Adapter {} listening for messages.", self.get_name());
while let Some(message_result) = socket_stream.next().await {
if !self.get_active() {
tracing::info!(
"Adapter {} became inactive while listening. Closing connection.",
self.get_name()
);
let _ = socket_stream.close(None).await; self.set_status(LoginStatus::Offline);
return;
}
match message_result {
Ok(msg) => {
if msg.is_close() {
tracing::info!(
"Adapter {} received WebSocket Close frame. Connection closed by peer.",
self.get_name(),
);
break;
}
}
Err(e) => {
tracing::error!(
"Adapter {} error while receiving message: {}. Attempting to reconnect.",
self.get_name(),
e
);
break;
}
}
}
if !self.get_active() {
tracing::info!(
"Adapter {} became inactive after message loop. Not reconnecting.",
self.get_name()
);
self.set_status(LoginStatus::Offline);
return;
}
tracing::warn!(
"Adapter {} disconnected or encountered an error in message loop. Preparing to reconnect.",
self.get_name()
);
let timeout = if retry_count >= ws_config.retry_times {
if ws_config.retry_lazy == 0 {
tracing::error!(
"Adapter {} reached max retry attempts ({}) for disconnection and no lazy retry. Stopping.",
self.get_name(),
ws_config.retry_times
);
self.set_status(LoginStatus::Offline);
return;
}
if retry_count == ws_config.retry_times {
tracing::warn!(
"Adapter {} reached max retry attempts for disconnection. Falling back to lazy retry ({}ms).",
self.get_name(),
ws_config.retry_lazy
);
}
ws_config.retry_lazy
} else {
ws_config.retry_interval
};
retry_count += 1;
self.set_status(LoginStatus::Reconnect);
tracing::info!(
"Adapter {} will retry connection in {}ms (attempt {}).",
self.get_name(),
timeout,
retry_count
);
tokio::time::sleep(tokio::time::Duration::from_millis(timeout)).await;
}
}
async fn stop(&self) -> FrameworkResult<()> {
if let Some(mut socket) = self.socket() {
socket.close(None).await?;
}
self.set_status(LoginStatus::Offline);
tracing::info!("适配器 {} 已停止。", self.get_name());
Ok(())
}
}