dingtalk_stream_sdk_rust/
down.rs1use crate::{up::ClientUpStream, Client};
4use anyhow::{bail, Result};
5use futures::TryStreamExt;
6use log::{debug, error, warn};
7use serde::Deserialize;
8use serde_json::json;
9use std::{
10 io::{Error, ErrorKind},
11 sync::Arc,
12};
13use tokio::io::{copy, AsyncWrite};
14use tokio_util::io::StreamReader;
15
16impl Client {
17 pub(crate) async fn on_down_stream(&self, p: ClientDownStream) -> Result<()> {
18 match p.r#type.as_str() {
19 "SYSTEM" => self.on_system(p).await?,
20 "EVENT" => self.on_event(p.headers.message_id, p.headers.event).await?,
21 "CALLBACK" => {
22 let msg = ClientUpStream::new(
23 serde_json::to_string(&json!({"response" : {}}))?,
24 p.headers.message_id.clone(),
25 );
26 self.send(msg).await?;
27 self.tx.broadcast(Arc::new(p)).await?;
28 }
29 _ => error!("unknown message type: {}", p.r#type),
30 }
31
32 Ok(())
33 }
34
35 async fn on_event(&self, message_id: impl Into<String>, p: EventData) -> Result<()> {
36 debug!("event received: {:?}", p);
37 let ack = self.on_event_callback.0.read().unwrap()(p);
38 let msg = ClientUpStream::new(serde_json::to_string(&ack)?, message_id);
39 self.send(msg).await?;
40
41 Ok(())
42 }
43
44 async fn on_system(&self, p: ClientDownStream) -> Result<()> {
45 match p.headers.topic.as_str() {
46 "CONNECTED" => debug!("[SYSTEM]: connected"),
47 "REGISTERED" => debug!("[SYSTEM]: registered"),
48 "disconnect" => debug!("[SYSTEM]: disconnect"),
49 "KEEPALIVE" => debug!("[SYSTEM]: keepalive"),
50 "ping" => {
51 debug!("[SYSTEM]: ping");
52 let msg = ClientUpStream::new(p.data, p.headers.message_id);
53 self.send(msg).await?;
54 }
55 _ => warn!("unknown system message: {}", p.headers.topic),
56 }
57
58 Ok(())
59 }
60
61 pub async fn download_url(&self, download_code: impl AsRef<str>) -> Result<String> {
63 let client_id = self.config.lock().unwrap().client_id.clone();
64 let response: DownloadUrl = self
65 .post(
66 DOWNLOAD_URL,
67 json!({ "downloadCode": download_code.as_ref(), "robotCode": client_id}),
68 )
69 .await?;
70 Ok(response.download_url)
71 }
72
73 pub async fn download(
75 &self,
76 download_code: impl AsRef<str>,
77 mut writer: impl AsyncWrite + Unpin,
78 ) -> Result<()> {
79 let download_url = self.download_url(download_code).await?;
80 let response = self.client.get(download_url).send().await?;
81 if !response.status().is_success() {
82 bail!(
83 "download error: {} - {}",
84 response.status(),
85 response.text().await?
86 );
87 }
88
89 let mut reader = StreamReader::new(
90 response
91 .bytes_stream()
92 .map_err(|e| Error::new(ErrorKind::Other, e)),
93 );
94 copy(&mut reader, &mut writer).await?;
95
96 Ok(())
97 }
98}
99
100#[derive(Deserialize)]
101#[serde(rename_all = "camelCase")]
102struct DownloadUrl {
103 download_url: String,
104}
105const DOWNLOAD_URL: &str = "https://api.dingtalk.com/v1.0/robot/messageFiles/download";
106
107#[derive(Debug, Default, Deserialize)]
108#[serde(rename_all = "camelCase")]
109#[allow(dead_code)]
110pub(crate) struct ClientDownStream {
111 pub spec_version: String,
112 pub r#type: String,
113 pub headers: StreamDownHeaders,
114 pub data: String,
115}
116
117#[derive(Debug, Default, Deserialize)]
118#[serde(rename_all = "camelCase")]
119#[allow(dead_code)]
120pub(crate) struct StreamDownHeaders {
121 #[serde(default)]
122 pub app_id: String,
123 #[serde(default)]
124 pub connection_id: String,
125 pub content_type: String,
126 pub message_id: String,
127 pub time: String,
128 pub topic: String,
129 #[serde(flatten)]
130 pub event: EventData,
131}
132
133#[derive(Debug, Default, Deserialize)]
137#[serde(rename_all = "camelCase")]
138pub struct EventData {
139 #[serde(default)]
140 pub event_type: String,
141 #[serde(default)]
142 pub event_born_time: String,
143 #[serde(default)]
144 pub event_id: String,
145 #[serde(default)]
146 pub event_corp_id: String,
147 #[serde(default)]
148 pub event_unified_app_id: String,
149}
150
151#[derive(Deserialize, Debug)]
155#[serde(rename_all = "camelCase")]
156pub struct RobotRecvMessage {
157 pub msg_id: String,
158 pub msgtype: String,
159 #[serde(alias = "text")]
160 pub content: MsgContent,
161
162 pub conversation_id: String,
163 pub conversation_type: String,
166 #[serde(default)]
167 pub conversation_title: String,
168
169 #[serde(default)]
170 pub at_users: Vec<User>,
171 #[serde(default)]
172 pub is_in_at_list: bool,
173
174 #[serde(default)]
175 pub chatbot_corp_id: String,
176 pub chatbot_user_id: String,
177
178 pub sender_id: String,
179 pub sender_nick: String,
180 #[serde(default)]
181 pub sender_corp_id: String,
182 #[serde(default)]
183 pub sender_staff_id: String,
184
185 pub session_webhook_expired_time: u64,
186 pub session_webhook: String,
187
188 #[serde(default)]
189 pub is_admin: bool,
190 pub create_at: u64,
191}
192
193#[derive(Deserialize, Debug)]
197#[serde(rename_all = "camelCase")]
198pub struct User {
199 pub dingtalk_id: String,
200 #[serde(default)]
201 pub staff_id: String,
202}
203
204#[derive(Deserialize, Debug)]
208#[serde(rename_all = "camelCase", untagged)]
209pub enum MsgContent {
210 #[serde(rename_all = "camelCase")]
211 Text { content: String },
212 #[serde(rename_all = "camelCase")]
213 File {
214 download_code: String,
215 file_name: String,
216 },
217 #[serde(rename_all = "camelCase")]
218 Picture {
219 download_code: String,
220 #[serde(default)]
221 picture_download_code: String,
222 },
223 #[serde(rename_all = "camelCase")]
224 RichText { rich_text: Vec<RichText> },
225 #[serde(rename_all = "camelCase")]
226 Audio {
227 duration: u32,
228 download_code: String,
229 recognition: String,
230 },
231 #[serde(rename_all = "camelCase")]
232 Video {
233 duration: u32,
234 download_code: String,
235 video_type: String,
236 },
237 #[serde(rename_all = "camelCase")]
238 UnknownMsgType { unknown_msg_type: String },
239}
240
241#[derive(Deserialize, Debug)]
245#[serde(rename_all = "camelCase", untagged)]
246pub enum RichText {
247 #[serde(rename_all = "camelCase")]
248 Text { text: String },
249 #[serde(rename_all = "camelCase")]
250 Picture {
251 download_code: String,
252 r#type: String,
253 },
254}