dingtalk_stream_sdk_rust/
down.rs

1//! Types and methods that handles down from DingTalk server
2
3use crate::{up::ClientUpStream, Client};
4use anyhow::{bail, Result};
5use futures::TryStreamExt;
6use log::{debug, error, warn};
7use serde::Deserialize;
8use serde_json::json;
9use std::{
10    io::{Error, ErrorKind},
11    sync::Arc,
12};
13use tokio::io::{copy, AsyncWrite};
14use tokio_util::io::StreamReader;
15
16impl Client {
17    pub(crate) async fn on_down_stream(&self, p: ClientDownStream) -> Result<()> {
18        match p.r#type.as_str() {
19            "SYSTEM" => self.on_system(p).await?,
20            "EVENT" => self.on_event(p.headers.message_id, p.headers.event).await?,
21            "CALLBACK" => {
22                let msg = ClientUpStream::new(
23                    serde_json::to_string(&json!({"response" : {}}))?,
24                    p.headers.message_id.clone(),
25                );
26                self.send(msg).await?;
27                self.tx.broadcast(Arc::new(p)).await?;
28            }
29            _ => error!("unknown message type: {}", p.r#type),
30        }
31
32        Ok(())
33    }
34
35    async fn on_event(&self, message_id: impl Into<String>, p: EventData) -> Result<()> {
36        debug!("event received: {:?}", p);
37        let ack = self.on_event_callback.0.read().unwrap()(p);
38        let msg = ClientUpStream::new(serde_json::to_string(&ack)?, message_id);
39        self.send(msg).await?;
40
41        Ok(())
42    }
43
44    async fn on_system(&self, p: ClientDownStream) -> Result<()> {
45        match p.headers.topic.as_str() {
46            "CONNECTED" => debug!("[SYSTEM]: connected"),
47            "REGISTERED" => debug!("[SYSTEM]: registered"),
48            "disconnect" => debug!("[SYSTEM]: disconnect"),
49            "KEEPALIVE" => debug!("[SYSTEM]: keepalive"),
50            "ping" => {
51                debug!("[SYSTEM]: ping");
52                let msg = ClientUpStream::new(p.data, p.headers.message_id);
53                self.send(msg).await?;
54            }
55            _ => warn!("unknown system message: {}", p.headers.topic),
56        }
57
58        Ok(())
59    }
60
61    /// get download url instead of download it
62    pub async fn download_url(&self, download_code: impl AsRef<str>) -> Result<String> {
63        let client_id = self.config.lock().unwrap().client_id.clone();
64        let response: DownloadUrl = self
65            .post(
66                DOWNLOAD_URL,
67                json!({ "downloadCode": download_code.as_ref(), "robotCode": client_id}),
68            )
69            .await?;
70        Ok(response.download_url)
71    }
72
73    /// download file from download_code
74    pub async fn download(
75        &self,
76        download_code: impl AsRef<str>,
77        mut writer: impl AsyncWrite + Unpin,
78    ) -> Result<()> {
79        let download_url = self.download_url(download_code).await?;
80        let response = self.client.get(download_url).send().await?;
81        if !response.status().is_success() {
82            bail!(
83                "download error: {} - {}",
84                response.status(),
85                response.text().await?
86            );
87        }
88
89        let mut reader = StreamReader::new(
90            response
91                .bytes_stream()
92                .map_err(|e| Error::new(ErrorKind::Other, e)),
93        );
94        copy(&mut reader, &mut writer).await?;
95
96        Ok(())
97    }
98}
99
100#[derive(Deserialize)]
101#[serde(rename_all = "camelCase")]
102struct DownloadUrl {
103    download_url: String,
104}
105const DOWNLOAD_URL: &str = "https://api.dingtalk.com/v1.0/robot/messageFiles/download";
106
107#[derive(Debug, Default, Deserialize)]
108#[serde(rename_all = "camelCase")]
109#[allow(dead_code)]
110pub(crate) struct ClientDownStream {
111    pub spec_version: String,
112    pub r#type: String,
113    pub headers: StreamDownHeaders,
114    pub data: String,
115}
116
117#[derive(Debug, Default, Deserialize)]
118#[serde(rename_all = "camelCase")]
119#[allow(dead_code)]
120pub(crate) struct StreamDownHeaders {
121    #[serde(default)]
122    pub app_id: String,
123    #[serde(default)]
124    pub connection_id: String,
125    pub content_type: String,
126    pub message_id: String,
127    pub time: String,
128    pub topic: String,
129    #[serde(flatten)]
130    pub event: EventData,
131}
132
133/// Event type pushed by DingTalk server
134///
135/// Please refer to the [official document](https://open.dingtalk.com/document/orgapp/org-event-overview) for the definition of each field
136#[derive(Debug, Default, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct EventData {
139    #[serde(default)]
140    pub event_type: String,
141    #[serde(default)]
142    pub event_born_time: String,
143    #[serde(default)]
144    pub event_id: String,
145    #[serde(default)]
146    pub event_corp_id: String,
147    #[serde(default)]
148    pub event_unified_app_id: String,
149}
150
151/// Message type pushed by DingTalk server
152///
153/// Please refer to the [official document](https://open.dingtalk.com/document/orgapp/receive-message) for the definition of each field
154#[derive(Deserialize, Debug)]
155#[serde(rename_all = "camelCase")]
156pub struct RobotRecvMessage {
157    pub msg_id: String,
158    pub msgtype: String,
159    #[serde(alias = "text")]
160    pub content: MsgContent,
161
162    pub conversation_id: String,
163    /// 1 - single chat
164    /// 2 - group chat
165    pub conversation_type: String,
166    #[serde(default)]
167    pub conversation_title: String,
168
169    #[serde(default)]
170    pub at_users: Vec<User>,
171    #[serde(default)]
172    pub is_in_at_list: bool,
173
174    #[serde(default)]
175    pub chatbot_corp_id: String,
176    pub chatbot_user_id: String,
177
178    pub sender_id: String,
179    pub sender_nick: String,
180    #[serde(default)]
181    pub sender_corp_id: String,
182    #[serde(default)]
183    pub sender_staff_id: String,
184
185    pub session_webhook_expired_time: u64,
186    pub session_webhook: String,
187
188    #[serde(default)]
189    pub is_admin: bool,
190    pub create_at: u64,
191}
192
193/// At(@) User type
194///
195/// Please refer to the [official document](https://open.dingtalk.com/document/orgapp/receive-message) for the definition of each field
196#[derive(Deserialize, Debug)]
197#[serde(rename_all = "camelCase")]
198pub struct User {
199    pub dingtalk_id: String,
200    #[serde(default)]
201    pub staff_id: String,
202}
203
204/// Enumeration types for all received messages
205///
206/// Please refer to the [official document](https://open.dingtalk.com/document/orgapp/receive-message) for the definition of each field
207#[derive(Deserialize, Debug)]
208#[serde(rename_all = "camelCase", untagged)]
209pub enum MsgContent {
210    #[serde(rename_all = "camelCase")]
211    Text { content: String },
212    #[serde(rename_all = "camelCase")]
213    File {
214        download_code: String,
215        file_name: String,
216    },
217    #[serde(rename_all = "camelCase")]
218    Picture {
219        download_code: String,
220        #[serde(default)]
221        picture_download_code: String,
222    },
223    #[serde(rename_all = "camelCase")]
224    RichText { rich_text: Vec<RichText> },
225    #[serde(rename_all = "camelCase")]
226    Audio {
227        duration: u32,
228        download_code: String,
229        recognition: String,
230    },
231    #[serde(rename_all = "camelCase")]
232    Video {
233        duration: u32,
234        download_code: String,
235        video_type: String,
236    },
237    #[serde(rename_all = "camelCase")]
238    UnknownMsgType { unknown_msg_type: String },
239}
240
241/// Enumeration types for rich text
242///
243/// Please refer to the [official document](https://open.dingtalk.com/document/orgapp/receive-message) for the definition of each field
244#[derive(Deserialize, Debug)]
245#[serde(rename_all = "camelCase", untagged)]
246pub enum RichText {
247    #[serde(rename_all = "camelCase")]
248    Text { text: String },
249    #[serde(rename_all = "camelCase")]
250    Picture {
251        download_code: String,
252        r#type: String,
253    },
254}