use crate::client::DingTalkStream;
use crate::frames::down_message::{callback_message::SessionWebhook, MessageType};
use crate::frames::{
down_message::{
event_message::EventMessage, system_message::SystemMessage, DownStreamMessage, MessageTopic,
},
AckMessage,
};
use std::sync::Arc;
use crate::frames::down_message::callback_message::{CallbackMessage, MessageData};
use crate::frames::up_message::callback_message::WebhookMessage;
use anyhow::anyhow;
use std::sync::atomic::Ordering;
use tokio::sync::mpsc;
use tokio::sync::mpsc::Receiver;
use log::{debug, error, info, warn};
impl DingTalkStream {
pub(super) async fn handle_message(
self: Arc<Self>,
text: &str,
tx: mpsc::Sender<String>,
) -> crate::Result<()> {
debug!("Received message: {:?}", text);
let downstream_message = serde_json::from_str::<DownStreamMessage>(text);
match downstream_message {
Ok(
message @ DownStreamMessage {
r#type: MessageType::System,
..
},
) => {
self.handle_system(message, tx).await?;
}
Ok(
message @ DownStreamMessage {
r#type: MessageType::Event,
..
},
) => {
self.handle_event(message, tx).await?;
}
Ok(
message @ DownStreamMessage {
r#type: MessageType::Callback,
..
},
) => {
self.handle_callback(message, tx).await?;
}
Err(err) => {
error!("Failed to parse message, err: {}", err);
}
}
Ok(())
}
}
impl DingTalkStream {
async fn handle_system(
self: Arc<Self>,
msg: DownStreamMessage,
tx: mpsc::Sender<String>,
) -> crate::Result<()> {
match &msg.headers.topic {
Some(MessageTopic::Connected) => {
info!("Connection established");
}
Some(MessageTopic::Registered) => {
self.registered.store(true, Ordering::SeqCst);
info!("Registered successfully");
}
Some(MessageTopic::Disconnect) => {
self.connected.store(false, Ordering::SeqCst);
self.registered.store(false, Ordering::SeqCst);
info!("Disconnected by server");
}
Some(MessageTopic::KeepAlive) => {
}
Some(MessageTopic::Ping) => {
let response = serde_json::json!({
"code": 200,
"headers": msg.headers,
"message": "OK",
"data": msg.data,
});
let _ = tx.send(response.to_string()).await;
}
None | Some(MessageTopic::Callback(_)) => {}
}
if let Some(handler) = &self.system_handler {
let Ok(sys_msg) = SystemMessage::try_from(msg) else {
warn!("Failed to parse system message, skipping processing");
return Ok(());
};
let _ = handler.process(Arc::clone(&self), &sys_msg).await;
}
Ok(())
}
async fn handle_event(
self: Arc<Self>,
msg: DownStreamMessage,
tx: mpsc::Sender<String>,
) -> crate::Result<()> {
if let Some(handler) = &self.event_handler {
let Ok(event_msg) = EventMessage::try_from(msg) else {
warn!("Failed to parse event message, skipping processing");
return Ok(());
};
let (code, response_msg) = match handler.process(Arc::clone(&self), &event_msg).await {
Ok(result) => (200, result.to_string()),
Err(err) => (err.code as i32, err.msg),
};
let ack = AckMessage::ok(&response_msg)
.with_message_id(event_msg.headers.message_id.clone().unwrap_or_default())
.with_content_type("application/json");
let _ = tx.send(serde_json::to_string(&ack)?).await;
debug!("Event processed with code: {}", code);
}
Ok(())
}
async fn handle_callback(
self: Arc<Self>,
msg: DownStreamMessage,
tx: mpsc::Sender<String>,
) -> crate::Result<()> {
let Some(topic) = msg.headers.topic.clone() else {
warn!("Callback message without topic, skipping processing");
return Ok(());
};
let Ok(cb_msg) = CallbackMessage::try_from(msg) else {
warn!("Failed to parse callback message, skipping processing");
return Ok(());
};
let sender = if let Some(MessageData {
session_webhook: Some(session_webhook),
..
}) = &cb_msg.data
{
let (sender, receiver) = mpsc::channel(1024);
let http_client = self.http_client.clone();
let session_webhook = session_webhook.clone();
tokio::spawn(async move {
Self::handle_webhook_message(http_client, session_webhook, receiver).await
});
Some(sender)
} else {
None
};
let Some(handler) = self.callback_handlers.get(&topic) else {
warn!("No handler registered for topic: {}", topic);
return Ok(());
};
let (code, response_msg) = match handler.process(Arc::clone(&self), &cb_msg, sender).await {
Ok(result) => (200, result.to_string()),
Err(err) => (err.code as i32, err.msg),
};
let ack = AckMessage::ok(&response_msg)
.with_message_id(cb_msg.headers.message_id.clone().unwrap_or_default())
.with_content_type("application/json")
.with_data(serde_json::json!({ "response": response_msg }));
let _ = tx.send(serde_json::to_string(&ack)?).await;
debug!("Callback processed with code: {}", code);
Ok(())
}
async fn handle_webhook_message(
http_client: reqwest::Client,
session_webhook: SessionWebhook,
mut receiver: Receiver<WebhookMessage>,
) {
if let (Ok(webhook_url), Some(timeout)) =
(session_webhook.webhook_url(), session_webhook.timeout())
{
match tokio::time::timeout(timeout, async {
while let Some(message) = receiver.recv().await {
let message @ WebhookMessage { send_result_cb, .. } = &message;
let response = http_client
.post(webhook_url.clone())
.header("Content-Type", "application/json")
.header("Accept", "*/*")
.json(&message)
.send()
.await;
if let Some(cb) = send_result_cb {
match response {
Ok(response) => {
let code = response.status();
match response.text().await {
Ok(text) => cb(Ok((code.as_u16(), text))),
Err(err) => cb(Err(anyhow!("{err}"))),
}
}
Err(err) => cb(Err(anyhow!("{err}"))),
}
}
}
})
.await
{
Ok(()) => {}
Err(_) => {
info!(
"webhook_url: {} elapsed after {}",
webhook_url,
timeout.as_millis()
)
}
}
}
}
}