use std::sync::Arc;
use std::time::Duration;
use serde_json::Value;
use tokio::sync::broadcast;
use mzrs_core::{ClientConfigBuilder, ClientRuntime, Event, Session};
use mzrs_proto::{api, realtime as rt};
use crate::error::SdkError;
use crate::handles::{ChannelHandle, ClanHandle};
use crate::types::ChannelType;
use crate::upload::AttachmentSource;
const DEFAULT_REQUEST_TIMEOUT: Duration = Duration::from_secs(10);
#[derive(Clone, Debug)]
pub struct JoinChatRequest {
pub clan_id: String,
pub channel_id: String,
pub channel_type: i32,
pub is_public: bool,
}
#[derive(Clone, Debug)]
pub struct LeaveChatRequest {
pub clan_id: String,
pub channel_id: String,
pub channel_type: i32,
pub is_public: bool,
}
#[derive(Clone, Debug)]
pub struct SendMessageRequest {
pub clan_id: String,
pub channel_id: String,
pub mode: i32,
pub is_public: bool,
pub content: Value,
pub mentions: Vec<api::MessageMention>,
pub attachments: Vec<api::MessageAttachment>,
pub references: Vec<api::MessageRef>,
pub anonymous_message: bool,
pub mention_everyone: bool,
pub avatar: Option<String>,
pub code: i32,
pub topic_id: Option<String>,
pub id: Option<String>,
}
#[derive(Clone, Debug)]
pub struct UpdateMessageRequest {
pub clan_id: String,
pub channel_id: String,
pub message_id: String,
pub mode: i32,
pub is_public: bool,
pub content: Value,
pub mentions: Vec<api::MessageMention>,
pub attachments: Vec<api::MessageAttachment>,
pub hide_editted: bool,
pub topic_id: Option<String>,
pub is_update_msg_topic: bool,
}
#[derive(Clone, Debug)]
pub struct DeleteMessageRequest {
pub clan_id: String,
pub channel_id: String,
pub message_id: String,
pub mode: i32,
pub is_public: bool,
pub has_attachment: bool,
pub topic_id: Option<String>,
pub mentions: Vec<u8>,
pub references: Vec<u8>,
}
#[derive(Clone, Debug)]
pub struct ReactMessageRequest {
pub id: Option<String>,
pub clan_id: String,
pub channel_id: String,
pub mode: i32,
pub is_public: bool,
pub message_id: String,
pub emoji_id: String,
pub emoji: String,
pub count: i32,
pub message_sender_id: String,
pub sender_id: Option<String>,
pub sender_name: Option<String>,
pub sender_avatar: Option<String>,
pub action_delete: bool,
pub topic_id: Option<String>,
pub emoji_recent_id: Option<String>,
}
#[derive(Clone)]
pub struct MzrsClientBuilder {
config_builder: ClientConfigBuilder,
request_timeout: Duration,
}
impl Default for MzrsClientBuilder {
fn default() -> Self {
Self {
config_builder: ClientConfigBuilder::default(),
request_timeout: DEFAULT_REQUEST_TIMEOUT,
}
}
}
impl MzrsClientBuilder {
pub fn new() -> Self {
Self::default()
}
pub fn bot_id(mut self, value: impl Into<String>) -> Self {
self.config_builder = self.config_builder.bot_id(value);
self
}
pub fn token(mut self, value: impl Into<String>) -> Self {
self.config_builder = self.config_builder.token(value);
self
}
pub fn host(mut self, value: impl Into<String>) -> Self {
self.config_builder = self.config_builder.host(value);
self
}
pub fn port(mut self, value: u16) -> Self {
self.config_builder = self.config_builder.port(value);
self
}
pub fn use_ssl(mut self, value: bool) -> Self {
self.config_builder = self.config_builder.use_ssl(value);
self
}
pub fn ws_path(mut self, value: impl Into<String>) -> Self {
self.config_builder = self.config_builder.ws_path(value);
self
}
pub fn auto_reconnect(mut self, value: bool) -> Self {
self.config_builder = self.config_builder.auto_reconnect(value);
self
}
pub fn max_reconnect_attempts(mut self, value: Option<u32>) -> Self {
self.config_builder = self.config_builder.max_reconnect_attempts(value);
self
}
pub fn reconnect_initial_backoff(mut self, value: Duration) -> Self {
self.config_builder = self.config_builder.reconnect_initial_backoff(value);
self
}
pub fn reconnect_max_backoff(mut self, value: Duration) -> Self {
self.config_builder = self.config_builder.reconnect_max_backoff(value);
self
}
pub fn heartbeat_interval(mut self, value: Duration) -> Self {
self.config_builder = self.config_builder.heartbeat_interval(value);
self
}
pub fn heartbeat_timeout(mut self, value: Duration) -> Self {
self.config_builder = self.config_builder.heartbeat_timeout(value);
self
}
pub fn timeout(mut self, value: Duration) -> Self {
self.config_builder = self.config_builder.timeout(value);
self
}
pub fn request_timeout(mut self, value: Duration) -> Self {
self.request_timeout = value;
self
}
pub fn build(self) -> Result<MzrsClient, SdkError> {
let config = self.config_builder.build()?;
let runtime = ClientRuntime::new(config)?;
Ok(MzrsClient {
runtime: Arc::new(runtime),
request_timeout: self.request_timeout,
})
}
}
#[derive(Clone)]
pub struct MzrsClient {
runtime: Arc<ClientRuntime>,
request_timeout: Duration,
}
impl MzrsClient {
pub fn builder() -> MzrsClientBuilder {
MzrsClientBuilder::default()
}
pub fn runtime(&self) -> &ClientRuntime {
self.runtime.as_ref()
}
pub fn subscribe(&self) -> broadcast::Receiver<Event> {
self.runtime.subscribe()
}
#[tracing::instrument(skip(self))]
pub async fn login(&self) -> Result<Session, SdkError> {
self.runtime.login().await.map_err(SdkError::from)
}
pub async fn session(&self) -> Option<Session> {
self.runtime.session().await
}
pub async fn set_session(&self, session: Session) {
self.runtime.set_session(session).await;
}
#[tracing::instrument(skip(self))]
pub async fn connect(&self) -> Result<(), SdkError> {
self.runtime
.connect_realtime()
.await
.map_err(SdkError::from)
}
#[tracing::instrument(skip(self))]
pub async fn connect_with_retry(&self) -> Result<(), SdkError> {
self.runtime
.connect_realtime_with_retry()
.await
.map_err(SdkError::from)
}
pub fn clan(&self, clan_id: impl Into<String>) -> ClanHandle {
ClanHandle::new(self.clone(), clan_id.into())
}
pub fn channel(
&self,
clan_id: impl Into<String>,
channel_id: impl Into<String>,
) -> ChannelHandle {
ChannelHandle::new(self.clone(), clan_id.into(), channel_id.into())
}
#[tracing::instrument(skip(self, envelope))]
pub async fn send_envelope(&self, envelope: &rt::Envelope) -> Result<(), SdkError> {
self.runtime
.send_envelope(envelope)
.await
.map_err(SdkError::from)
}
#[tracing::instrument(skip(self, envelope))]
pub async fn request_envelope(&self, envelope: rt::Envelope) -> Result<rt::Envelope, SdkError> {
self.runtime
.request(envelope, self.request_timeout)
.await
.map_err(SdkError::from)
}
#[tracing::instrument(skip(self))]
pub async fn list_clans(&self) -> Result<Vec<api::ClanDesc>, SdkError> {
self.runtime.list_clans().await.map_err(SdkError::from)
}
#[tracing::instrument(skip(self, request))]
pub async fn create_channel(
&self,
request: api::CreateChannelDescRequest,
) -> Result<api::ChannelDescription, SdkError> {
self.runtime
.create_channel(request)
.await
.map_err(SdkError::from)
}
#[tracing::instrument(skip(self), fields(peer_id = peer_id))]
pub async fn create_dm(&self, peer_id: &str) -> Result<api::ChannelDescription, SdkError> {
let peer_id = parse_id(peer_id, "peer_id")?;
self.create_channel(api::CreateChannelDescRequest {
r#type: ChannelType::Dm as i32,
channel_private: 1,
user_ids: vec![peer_id],
..Default::default()
})
.await
}
#[tracing::instrument(skip(self, request))]
pub async fn update_channel_desc(
&self,
request: api::UpdateChannelDescRequest,
) -> Result<(), SdkError> {
self.runtime
.update_channel_desc(request)
.await
.map_err(SdkError::from)
}
#[tracing::instrument(skip(self, channel_avatar))]
pub async fn update_channel_avatar(
&self,
clan_id: &str,
channel_id: &str,
channel_avatar: Option<String>,
) -> Result<(), SdkError> {
self.update_channel_desc(api::UpdateChannelDescRequest {
clan_id: parse_id(clan_id, "clan_id")?,
channel_id: parse_id(channel_id, "channel_id")?,
channel_avatar,
..Default::default()
})
.await
}
#[tracing::instrument(skip(self, request))]
pub async fn update_user_profile_by_clan(
&self,
request: api::UpdateClanProfileRequest,
) -> Result<(), SdkError> {
self.runtime
.update_user_profile_by_clan(request)
.await
.map_err(SdkError::from)
}
#[tracing::instrument(skip(self, nick_name, avatar))]
pub async fn update_clan_profile(
&self,
clan_id: &str,
nick_name: Option<String>,
avatar: Option<String>,
) -> Result<(), SdkError> {
self.update_user_profile_by_clan(api::UpdateClanProfileRequest {
clan_id: parse_id(clan_id, "clan_id")?,
nick_name,
avatar,
})
.await
}
#[tracing::instrument(skip(self, avatar))]
pub async fn update_clan_profile_avatar(
&self,
clan_id: &str,
avatar: Option<String>,
) -> Result<(), SdkError> {
self.update_clan_profile(clan_id, None, avatar).await
}
#[tracing::instrument(skip(self))]
pub async fn join_clan_chat(&self, clan_id: &str) -> Result<rt::ClanJoin, SdkError> {
let response = self
.request_envelope(rt::Envelope {
clan_join: Some(rt::ClanJoin {
clan_id: parse_id(clan_id, "clan_id")?,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
response.clan_join.ok_or_else(|| {
SdkError::Core(mzrs_core::CoreError::Decode(
"missing clan_join in response".to_string(),
))
})
}
#[tracing::instrument(skip(self, request))]
pub async fn join_chat(&self, request: JoinChatRequest) -> Result<rt::Channel, SdkError> {
let response = self
.request_envelope(rt::Envelope {
channel_join: Some(rt::ChannelJoin {
clan_id: parse_id(&request.clan_id, "clan_id")?,
channel_id: parse_id(&request.channel_id, "channel_id")?,
channel_type: request.channel_type,
is_public: request.is_public,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
response.channel.ok_or_else(|| {
SdkError::Core(mzrs_core::CoreError::Decode(
"missing channel in response".to_string(),
))
})
}
#[tracing::instrument(skip(self, request))]
pub async fn leave_chat(&self, request: LeaveChatRequest) -> Result<(), SdkError> {
let response = self
.request_envelope(rt::Envelope {
channel_leave: Some(rt::ChannelLeave {
clan_id: parse_id(&request.clan_id, "clan_id")?,
channel_id: parse_id(&request.channel_id, "channel_id")?,
channel_type: request.channel_type,
is_public: request.is_public,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
Ok(())
}
#[tracing::instrument(skip(self, request))]
pub async fn send_message(
&self,
request: SendMessageRequest,
) -> Result<rt::ChannelMessageAck, SdkError> {
let content = serde_json::to_string(&request.content)
.map_err(|err| SdkError::Core(mzrs_core::CoreError::Encode(err.to_string())))?;
let response = self
.request_envelope(rt::Envelope {
channel_message_send: Some(rt::ChannelMessageSend {
clan_id: parse_id(&request.clan_id, "clan_id")?,
channel_id: parse_id(&request.channel_id, "channel_id")?,
content,
mentions: request.mentions,
attachments: request.attachments,
references: request.references,
mode: request.mode,
anonymous_message: request.anonymous_message,
mention_everyone: request.mention_everyone,
avatar: request.avatar.unwrap_or_default(),
is_public: request.is_public,
code: request.code,
topic_id: parse_optional_id(request.topic_id.as_deref(), "topic_id")?,
id: parse_optional_id(request.id.as_deref(), "id")?,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
response.channel_message_ack.ok_or_else(|| {
SdkError::Core(mzrs_core::CoreError::Decode(
"missing channel_message_ack in response".to_string(),
))
})
}
#[tracing::instrument(skip(self, request))]
pub async fn update_message(
&self,
request: UpdateMessageRequest,
) -> Result<rt::ChannelMessageAck, SdkError> {
let content = serde_json::to_string(&request.content)
.map_err(|err| SdkError::Core(mzrs_core::CoreError::Encode(err.to_string())))?;
let has_topic_update = request.is_update_msg_topic || request.topic_id.is_some();
let response = self
.request_envelope(rt::Envelope {
channel_message_update: Some(rt::ChannelMessageUpdate {
clan_id: parse_id(&request.clan_id, "clan_id")?,
channel_id: parse_id(&request.channel_id, "channel_id")?,
message_id: parse_id(&request.message_id, "message_id")?,
content,
mentions: request.mentions,
attachments: request.attachments,
mode: request.mode,
is_public: request.is_public,
hide_editted: request.hide_editted,
topic_id: parse_optional_id(request.topic_id.as_deref(), "topic_id")?,
is_update_msg_topic: has_topic_update,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
response.channel_message_ack.ok_or_else(|| {
SdkError::Core(mzrs_core::CoreError::Decode(
"missing channel_message_ack in response".to_string(),
))
})
}
#[tracing::instrument(skip(self, request))]
pub async fn delete_message(
&self,
request: DeleteMessageRequest,
) -> Result<rt::ChannelMessageAck, SdkError> {
let response = self
.request_envelope(rt::Envelope {
channel_message_remove: Some(rt::ChannelMessageRemove {
clan_id: parse_id(&request.clan_id, "clan_id")?,
channel_id: parse_id(&request.channel_id, "channel_id")?,
message_id: parse_id(&request.message_id, "message_id")?,
mode: request.mode,
is_public: request.is_public,
has_attachment: request.has_attachment,
topic_id: parse_optional_id(request.topic_id.as_deref(), "topic_id")?,
mentions: request.mentions,
references: request.references,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
response.channel_message_ack.ok_or_else(|| {
SdkError::Core(mzrs_core::CoreError::Decode(
"missing channel_message_ack in response".to_string(),
))
})
}
#[tracing::instrument(skip(self, request))]
pub async fn react_message(
&self,
request: ReactMessageRequest,
) -> Result<api::MessageReaction, SdkError> {
let response = self
.request_envelope(rt::Envelope {
message_reaction_event: Some(api::MessageReaction {
id: parse_optional_id(request.id.as_deref(), "id")?,
emoji_id: parse_id(&request.emoji_id, "emoji_id")?,
emoji: request.emoji,
sender_id: parse_optional_id(request.sender_id.as_deref(), "sender_id")?,
sender_name: request.sender_name.unwrap_or_default(),
sender_avatar: request.sender_avatar.unwrap_or_default(),
action: request.action_delete,
count: request.count,
channel_id: parse_id(&request.channel_id, "channel_id")?,
message_id: parse_id(&request.message_id, "message_id")?,
clan_id: parse_id(&request.clan_id, "clan_id")?,
mode: request.mode,
message_sender_id: parse_id(&request.message_sender_id, "message_sender_id")?,
is_public: request.is_public,
topic_id: parse_optional_id(request.topic_id.as_deref(), "topic_id")?,
emoji_recent_id: parse_optional_id(
request.emoji_recent_id.as_deref(),
"emoji_recent_id",
)?,
}),
..Default::default()
})
.await?;
map_realtime_error(&response)?;
response.message_reaction_event.ok_or_else(|| {
SdkError::Core(mzrs_core::CoreError::Decode(
"missing message_reaction_event in response".to_string(),
))
})
}
#[deprecated(
note = "Mezon bot attachment upload is not supported; attach externally hosted URLs instead"
)]
#[tracing::instrument(skip(self, _source, _filename, _filetype))]
pub async fn upload_attachment(
&self,
_source: impl Into<AttachmentSource>,
_filename: impl Into<String>,
_filetype: impl Into<String>,
) -> Result<api::MessageAttachment, SdkError> {
Err(SdkError::Core(mzrs_core::CoreError::Unsupported(
"Mezon does not support attachment upload for bots".to_string(),
)))
}
#[tracing::instrument(skip(self))]
pub async fn close(&self) -> Result<(), SdkError> {
self.runtime.close().await.map_err(SdkError::from)
}
}
fn map_realtime_error(envelope: &rt::Envelope) -> Result<(), SdkError> {
if let Some(ref err) = envelope.error {
return Err(SdkError::Core(mzrs_core::CoreError::Decode(format!(
"realtime error {}: {}",
err.code, err.message
))));
}
Ok(())
}
fn parse_id(value: &str, field: &str) -> Result<i64, SdkError> {
value.parse::<i64>().map_err(|_| {
SdkError::Validation(format!(
"invalid {field}, expected int64 string but got '{value}'"
))
})
}
fn parse_optional_id(value: Option<&str>, field: &str) -> Result<i64, SdkError> {
match value {
Some(raw) if !raw.trim().is_empty() => parse_id(raw, field),
_ => Ok(0),
}
}