1use crate::api::APISender as APISenderTrait;
2use crate::api::arg_type::MessageType;
3use crate::api::return_type::*;
4pub use crate::event::Event as NormalEvent;
5use crate::event::EventReceiver as EventReceiverTrait;
6use crate::event::EventTrait;
7use crate::event::message::GroupMessageAnonymous;
8use crate::message::receive_segment::ReceiveSegment;
9use crate::message::send_segment::SendSegment;
10use async_trait::async_trait;
11pub use flume::Receiver as FlumeReceiver;
12use flume::SendError;
13pub use flume::Sender as FlumeSender;
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value as JsonValue, json};
17use std::sync::Arc;
18use std::time::Duration;
19use strum_macros::EnumIs;
20use tokio::sync::broadcast;
21pub use tokio::sync::broadcast::Receiver as BroadcastReceiver;
22pub use tokio::sync::broadcast::Sender as BroadcastSender;
23
24pub type APISender = FlumeSender<APIRequest>;
25pub type APIReceiver = FlumeReceiver<APIRequest>;
26pub type EventSender = BroadcastSender<Arc<Event>>;
27pub type EventReceiver = BroadcastReceiver<Arc<Event>>;
28
29#[derive(Serialize, Clone, Debug)]
30pub struct APIRequest {
31 pub action: String,
32 pub params: JsonValue,
33 pub echo: Option<String>,
34}
35
36#[derive(Deserialize, Clone, Debug)]
37pub struct APIResponse {
38 pub status: String,
39 pub retcode: i32,
40 pub data: JsonValue,
41 pub echo: Option<String>,
42}
43
44impl APIResponse {
45 pub fn verify(&self) -> bool {
46 self.status == "ok"
47 }
48
49 pub fn parse_data<T: DeserializeOwned>(self) -> anyhow::Result<T> {
50 if !self.verify() {
51 return Err(anyhow::anyhow!("request failed with code {}", self.retcode));
52 }
53 Ok(serde_json::from_value(self.data)?)
54 }
55}
56
57#[derive(Deserialize, Clone, Debug, EnumIs)]
58#[serde(untagged)]
59pub enum Event {
60 APIResponse(APIResponse),
61 Event(NormalEvent),
62}
63
64impl EventTrait for Event {}
65impl EventTrait for Arc<Event> {}
66
67#[async_trait]
68pub trait APIResponseListener {
69 async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>>;
70 async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>>;
71 async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>>;
72}
73
74#[async_trait]
75impl APIResponseListener for BroadcastReceiver<Arc<Event>> {
76 async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>> {
77 match timeout {
78 Some(timeout) => self.listen_with_timeout(echo, timeout).await,
79 None => self.listen_without_timeout(echo).await,
80 }
81 }
82 async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>> {
83 loop {
84 if let Ok(event) = self.recv().await && let Event::APIResponse(response) = &*event && response
87 .echo
88 .as_ref()
89 .map(|target_echo| target_echo == &echo) .unwrap_or(false)
91 {
93 return Some(Arc::clone(&event));
94 }
95 if self.is_closed() {
96 return None;
97 }
98 }
99 }
100
101 async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>> {
102 tokio::time::timeout(timeout, self.listen_without_timeout(echo))
103 .await
104 .ok()?
105 }
106}
107
108#[async_trait]
109pub trait CommunicationService: Sync + Send {
110 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender);
111 async fn start_service(&self) -> anyhow::Result<()>;
112}
113
114pub trait IntoService {
115 fn into(self) -> impl CommunicationService + 'static;
116}
117
118impl<T: CommunicationService + 'static> IntoService for T {
119 fn into(self) -> impl CommunicationService + 'static {
120 self
121 }
122}
123
124pub struct Client {
138 service: Box<dyn CommunicationService>,
139 api_sender: APISender,
140 api_receiver: APIReceiver,
141 event_sender: EventSender,
142 timeout: Option<Duration>,
143}
144
145impl EventReceiverTrait<Arc<Event>> for Client {
146 fn get_receiver(&self) -> EventReceiver {
147 self.event_sender.subscribe()
148 }
149}
150
151impl Client {
152 pub fn new(
160 service: impl IntoService,
161 timeout: Option<Duration>,
162 api_channel_cap: Option<usize>,
163 event_channel_cap: Option<usize>,
164 ) -> Self {
165 let mut service = Box::new(service.into());
166 let (api_sender, api_receiver) = flume::bounded(api_channel_cap.unwrap_or(16));
167 let (event_sender, _) = broadcast::channel(event_channel_cap.unwrap_or(16));
168 service.inject(api_receiver.clone(), event_sender.clone());
169 Self {
170 service,
171 api_receiver,
172 api_sender,
173 event_sender,
174 timeout,
175 }
176 }
177}
178
179impl Client {
180 pub async fn start_service(&self) -> anyhow::Result<()> {
181 self.service.start_service().await
182 }
183
184 pub fn change_service(&mut self, service: impl IntoService) -> Box<dyn CommunicationService> {
185 let mut service = Box::new(service.into());
186 service.inject(self.api_receiver.clone(), self.event_sender.clone());
187 std::mem::replace(&mut self.service, service)
188 }
189
190 pub fn get_service(&self) -> &dyn CommunicationService {
191 &*self.service
192 }
193
194 pub fn get_service_mut(&mut self) -> &mut dyn CommunicationService {
195 &mut *self.service
196 }
197}
198
199impl Client {
200 pub fn generate_id() -> String {
201 uuid::Uuid::new_v4().to_string()
202 }
203
204 pub async fn get_response(&self, echo: String) -> Option<APIResponse> {
205 let mut receiver = self.get_receiver();
206 if let Event::APIResponse(res) = &*(receiver.listen(echo, self.timeout).await?) {
207 Some(res.clone())
208 } else {
209 None
210 }
211 }
212
213 pub fn parse_response<T: DeserializeOwned>(response: APIResponse) -> anyhow::Result<T> {
214 response.parse_data()
215 }
216
217 pub async fn send_request(
218 &self,
219 action: String,
220 params: JsonValue,
221 echo: String,
222 ) -> Result<(), SendError<APIRequest>> {
223 self
224 .api_sender
225 .send_async(APIRequest {
226 action,
227 params,
228 echo: Some(echo),
229 })
230 .await
231 }
232
233 pub async fn send_and_parse<T: DeserializeOwned>(
245 &self,
246 action: impl ToString,
247 params: JsonValue,
248 ) -> anyhow::Result<T> {
249 let echo = Self::generate_id();
250 self
251 .send_request(action.to_string(), params, echo.clone())
252 .await?;
253 let response = self.get_response(echo).await;
254 if response.is_none() {
255 return Err(anyhow::anyhow!("request time out"));
256 }
257 let response = response.unwrap();
258 Self::parse_response(response)
259 }
260}
261
262#[async_trait]
263impl APISenderTrait for Client {
264 async fn send_private_msg(
265 &self,
266 user_id: i64,
267 message: Vec<SendSegment>,
268 auto_escape: Option<bool>,
269 ) -> anyhow::Result<i32> {
270 let params = json!({
271 "user_id": user_id,
272 "message": message,
273 "auto_escape": auto_escape
274 });
275 let response: SendMsgResponse = self.send_and_parse("send_private_msg", params).await?;
276 Ok(response.message_id)
277 }
278
279 async fn send_group_msg(
280 &self,
281 group_id: i64,
282 message: Vec<SendSegment>,
283 auto_escape: Option<bool>,
284 ) -> anyhow::Result<i32> {
285 let params = json!({
286 "group_id": group_id,
287 "message": message,
288 "auto_escape": auto_escape
289 });
290 let response: SendMsgResponse = self.send_and_parse("send_group_msg", params).await?;
291 Ok(response.message_id)
292 }
293
294 async fn send_msg(
295 &self,
296 message_type: Option<MessageType>,
297 user_id: i64,
298 group_id: i64,
299 message: Vec<SendSegment>,
300 auto_escape: Option<bool>,
301 ) -> anyhow::Result<i32> {
302 let params = json!({
303 "message_type": message_type,
304 "user_id": user_id,
305 "group_id": group_id,
306 "message": message,
307 "auto_escape": auto_escape
308 });
309 let response: SendMsgResponse = self.send_and_parse("send_msg", params).await?;
310 Ok(response.message_id)
311 }
312
313 async fn delete_msg(&self, message_id: i32) -> anyhow::Result<()> {
314 let params = json!({
315 "message_id": message_id
316 });
317 self.send_and_parse("delete_msg", params).await
318 }
319
320 async fn get_msg(&self, message_id: i32) -> anyhow::Result<GetMsgResponse> {
321 let params = json!({
322 "message_id": message_id
323 });
324 self.send_and_parse("get_msg", params).await
325 }
326
327 async fn get_forward_msg(&self, id: String) -> anyhow::Result<Vec<ReceiveSegment>> {
328 let params = json!({
329 "id": id
330 });
331 let response: GetForwardMsgResponse = self.send_and_parse("get_forward_msg", params).await?;
332 Ok(response.message)
333 }
334
335 async fn send_like(&self, user_id: i64, times: Option<i32>) -> anyhow::Result<()> {
336 let params = json!({
337 "user_id": user_id,
338 "times": times
339 });
340 self.send_and_parse("send_like", params).await
341 }
342
343 async fn set_group_kick(
344 &self,
345 group_id: i32,
346 user_id: i32,
347 reject_add_request: Option<bool>,
348 ) -> anyhow::Result<()> {
349 let params = json!({
350 "group_id": group_id,
351 "user_id": user_id,
352 "reject_add_request": reject_add_request
353 });
354 self.send_and_parse("set_group_kick", params).await
355 }
356
357 async fn set_group_ban(
358 &self,
359 group_id: i32,
360 user_id: i32,
361 duration: Option<i32>,
362 ) -> anyhow::Result<()> {
363 let params = json!({
364 "group_id": group_id,
365 "user_id": user_id,
366 "duration": duration
367 });
368 self.send_and_parse("set_group_ban", params).await
369 }
370
371 async fn set_group_anonymous_ban(
372 &self,
373 group_id: i32,
374 anonymous: Option<GroupMessageAnonymous>,
375 flag: Option<String>,
376 duration: Option<i32>,
377 ) -> anyhow::Result<()> {
378 let params = json!({
379 "group_id": group_id,
380 "anonymous": anonymous,
381 "flag": flag,
382 "duration": duration
383 });
384 self.send_and_parse("set_group_anonymous_ban", params).await
385 }
386
387 async fn set_group_whole_ban(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
388 let params = json!({
389 "group_id": group_id,
390 "enable": enable
391 });
392 self.send_and_parse("set_group_whole_ban", params).await
393 }
394
395 async fn set_group_admin(
396 &self,
397 group_id: i32,
398 user_id: i32,
399 enable: Option<bool>,
400 ) -> anyhow::Result<()> {
401 let params = json!({
402 "group_id": group_id,
403 "user_id": user_id,
404 "enable": enable
405 });
406 self.send_and_parse("set_group_admin", params).await
407 }
408
409 async fn set_group_anonymous(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
410 let params = json!({
411 "group_id": group_id,
412 "enable": enable
413 });
414 self.send_and_parse("set_group_anonymous", params).await
415 }
416
417 async fn set_group_card(
418 &self,
419 group_id: i32,
420 user_id: i32,
421 card: Option<String>,
422 ) -> anyhow::Result<()> {
423 let params = json!({
424 "group_id": group_id,
425 "user_id": user_id,
426 "card": card
427 });
428 self.send_and_parse("set_group_card", params).await
429 }
430
431 async fn set_group_name(&self, group_id: i32, group_name: String) -> anyhow::Result<()> {
432 let params = json!({
433 "group_id": group_id,
434 "group_name": group_name
435 });
436 self.send_and_parse("set_group_name", params).await
437 }
438
439 async fn set_group_leave(&self, group_id: i32, is_dismiss: Option<bool>) -> anyhow::Result<()> {
440 let params = json!({
441 "group_id": group_id,
442 "is_dismiss": is_dismiss
443 });
444 self.send_and_parse("set_group_leave", params).await
445 }
446
447 async fn set_group_special_title(
448 &self,
449 group_id: i32,
450 user_id: i32,
451 special_title: Option<String>,
452 duration: Option<i32>,
453 ) -> anyhow::Result<()> {
454 let params = json!({
455 "group_id": group_id,
456 "user_id": user_id,
457 "special_title": special_title,
458 "duration": duration
459 });
460 self.send_and_parse("set_group_special_title", params).await
461 }
462
463 async fn set_friend_add_request(
464 &self,
465 flag: String,
466 approve: Option<bool>,
467 remark: Option<String>,
468 ) -> anyhow::Result<()> {
469 let params = json!({
470 "flag": flag,
471 "approve": approve,
472 "remark": remark
473 });
474 self.send_and_parse("set_friend_add_request", params).await
475 }
476
477 async fn set_group_add_request(
478 &self,
479 flag: String,
480 sub_type: String,
481 approve: Option<bool>,
482 reason: Option<String>,
483 ) -> anyhow::Result<()> {
484 let params = json!({
485 "flag": flag,
486 "sub_type": sub_type,
487 "approve": approve,
488 "reason": reason
489 });
490 self.send_and_parse("set_group_add_request", params).await
491 }
492
493 async fn get_login_info(&self) -> anyhow::Result<GetLoginInfoResponse> {
494 let params = json!({});
495 self.send_and_parse("get_login_info", params).await
496 }
497
498 async fn get_stranger_info(
499 &self,
500 user_id: i32,
501 no_cache: Option<bool>,
502 ) -> anyhow::Result<GetStrangerInfoResponse> {
503 let params = json!({
504 "user_id": user_id,
505 "no_cache": no_cache
506 });
507 self.send_and_parse("get_stranger_info", params).await
508 }
509
510 async fn get_friend_list(&self) -> anyhow::Result<Vec<GetFriendListResponse>> {
511 let params = json!({});
512 self.send_and_parse("get_friend_list", params).await
513 }
514
515 async fn get_group_info(
516 &self,
517 group_id: i32,
518 no_cache: Option<bool>,
519 ) -> anyhow::Result<GetGroupInfoResponse> {
520 let params = json!({
521 "group_id": group_id,
522 "no_cache": no_cache
523 });
524 self.send_and_parse("get_group_info", params).await
525 }
526
527 async fn get_group_list(&self) -> anyhow::Result<Vec<GetGroupInfoResponse>> {
528 let params = json!({});
529 self.send_and_parse("get_group_list", params).await
530 }
531
532 async fn get_group_member_info(
533 &self,
534 group_id: i32,
535 user_id: i32,
536 no_cache: Option<bool>,
537 ) -> anyhow::Result<GetGroupMemberInfoResponse> {
538 let params = json!({
539 "group_id": group_id,
540 "user_id": user_id,
541 "no_cache": no_cache
542 });
543 self.send_and_parse("get_group_member_info", params).await
544 }
545
546 async fn get_group_member_list(
547 &self,
548 group_id: i32,
549 ) -> anyhow::Result<Vec<GetGroupMemberInfoResponse>> {
550 let params = json!({
551 "group_id": group_id
552 });
553 self.send_and_parse("get_group_member_list", params).await
554 }
555
556 async fn get_group_honor_info(
557 &self,
558 group_id: i64,
559 honor_type: String,
560 ) -> anyhow::Result<GetGroupMemberInfoResponse> {
561 let params = json!({
562 "group_id": group_id,
563 "type": honor_type
564 });
565 self.send_and_parse("get_group_honor_info", params).await
566 }
567
568 async fn get_cookies(&self, domain: Option<String>) -> anyhow::Result<String> {
569 let params = json!({
570 "domain": domain
571 });
572 let response: GetCookiesResponse = self.send_and_parse("get_cookies", params).await?;
573 Ok(response.cookies)
574 }
575
576 async fn get_csrf_token(&self) -> anyhow::Result<i32> {
577 let params = json!({});
578 let response: GetCsrfTokenResponse = self.send_and_parse("get_csrf_token", params).await?;
579 Ok(response.token)
580 }
581
582 async fn get_credentials(
583 &self,
584 domain: Option<String>,
585 ) -> anyhow::Result<GetCredentialsResponse> {
586 let params = json!({
587 "domain": domain
588 });
589 self.send_and_parse("get_credentials", params).await
590 }
591
592 async fn get_record(&self, file: String, out_format: String) -> anyhow::Result<String> {
593 let params = json!({
594 "file": file,
595 "out_format": out_format
596 });
597 let response: GetDataResponse = self.send_and_parse("get_record", params).await?;
598 Ok(response.file)
599 }
600
601 async fn get_image(&self, file: String) -> anyhow::Result<String> {
602 let params = json!({
603 "file": file
604 });
605 let response: GetDataResponse = self.send_and_parse("get_image", params).await?;
606 Ok(response.file)
607 }
608
609 async fn can_send_image(&self) -> anyhow::Result<bool> {
610 let params = json!({});
611 let response: CanSendResponse = self.send_and_parse("can_send_image", params).await?;
612 Ok(response.yes)
613 }
614
615 async fn can_send_record(&self) -> anyhow::Result<bool> {
616 let params = json!({});
617 let response: CanSendResponse = self.send_and_parse("can_send_record", params).await?;
618 Ok(response.yes)
619 }
620
621 async fn get_status(&self) -> anyhow::Result<GetStatusResponse> {
622 let params = json!({});
623 self.send_and_parse("get_status", params).await
624 }
625
626 async fn get_version_info(&self) -> anyhow::Result<GetVersionInfoResponse> {
627 let params = json!({});
628 self.send_and_parse("get_version_info", params).await
629 }
630
631 async fn set_restart(&self, delay: Option<i32>) -> anyhow::Result<()> {
632 let params = json!({
633 "delay": delay
634 });
635 self.send_and_parse("set_restart", params).await
636 }
637
638 async fn clean_cache(&self) -> anyhow::Result<()> {
639 let params = json!({});
640 self.send_and_parse("clean_cache", params).await
641 }
642}