1use crate::api::APISender as APISenderTrait;
2use crate::api::arg_type::MessageType;
3use crate::api::return_type::*;
4pub use crate::event::Event as NormalEvent;
5use crate::event::EventReceiver as EventReceiverTrait;
6use crate::event::EventTrait;
7use crate::event::message::GroupMessageAnonymous;
8use crate::message::receive_segment::ReceiveSegment;
9use crate::message::send_segment::SendSegment;
10use async_trait::async_trait;
11pub use flume::Receiver as FlumeReceiver;
12use flume::SendError;
13pub use flume::Sender as FlumeSender;
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value as JsonValue, json};
17use std::sync::Arc;
18use std::time::Duration;
19use strum_macros::EnumIs;
20use tokio::sync::broadcast;
21pub use tokio::sync::broadcast::Receiver as BroadcastReceiver;
22pub use tokio::sync::broadcast::Sender as BroadcastSender;
23
24pub type APISender = FlumeSender<APIRequest>;
25pub type APIReceiver = FlumeReceiver<APIRequest>;
26pub type EventSender = BroadcastSender<Arc<Event>>;
27pub type EventReceiver = BroadcastReceiver<Arc<Event>>;
28
29#[derive(Serialize, Clone, Debug)]
30pub struct APIRequest {
31 pub action: String,
32 pub params: JsonValue,
33 pub echo: Option<String>,
34}
35
36#[derive(Deserialize, Clone, Debug)]
37pub struct APIResponse {
38 pub status: String,
39 pub retcode: i32,
40 pub data: JsonValue,
41 pub echo: Option<String>,
42}
43
44impl APIResponse {
45 pub fn verify(&self) -> bool {
46 self.status == "ok"
47 }
48
49 pub fn parse_data<T: DeserializeOwned>(self) -> anyhow::Result<T> {
50 if !self.verify() {
51 return Err(anyhow::anyhow!("request failed with code {}", self.retcode));
52 }
53 Ok(serde_json::from_value(self.data)?)
54 }
55}
56
57#[derive(Deserialize, Clone, Debug, EnumIs)]
58#[serde(untagged)]
59pub enum Event {
60 APIResponse(APIResponse),
61 Event(NormalEvent),
62}
63
64impl EventTrait for Event {}
65impl EventTrait for Arc<Event> {}
66
67#[async_trait]
68pub trait APIResponseListener {
69 async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>>;
70 async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>>;
71 async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>>;
72}
73
74#[async_trait]
75impl APIResponseListener for BroadcastReceiver<Arc<Event>> {
76 async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>> {
77 match timeout {
78 Some(timeout) => self.listen_with_timeout(echo, timeout).await,
79 None => self.listen_without_timeout(echo).await,
80 }
81 }
82 async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>> {
83 loop {
84 if let Ok(event) = self.recv().await && let Event::APIResponse(response) = &*event && response
87 .echo
88 .as_ref()
89 .map(|target_echo| target_echo == &echo) .unwrap_or(false)
91 {
93 return Some(Arc::clone(&event));
94 }
95 if self.is_closed() {
96 return None;
97 }
98 }
99 }
100
101 async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>> {
102 tokio::time::timeout(timeout, self.listen_without_timeout(echo))
103 .await
104 .ok()?
105 }
106}
107
108#[async_trait]
109pub trait CommunicationService: Sync + Send {
110 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender);
111 async fn start_service(&self) -> anyhow::Result<()>;
112}
113
114#[async_trait]
115impl CommunicationService for Box<dyn CommunicationService> {
116 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
117 (**self).inject(api_receiver, event_sender);
118 }
119
120 async fn start_service(&self) -> anyhow::Result<()> {
121 (**self).start_service().await
122 }
123}
124
125pub trait IntoService {
126 fn into(self) -> impl CommunicationService + 'static;
127}
128
129impl<T: CommunicationService + 'static> IntoService for T {
130 fn into(self) -> impl CommunicationService + 'static {
131 self
132 }
133}
134
135pub struct Client {
149 service: Box<dyn CommunicationService>,
150 api_sender: APISender,
151 api_receiver: APIReceiver,
152 event_sender: EventSender,
153 timeout: Option<Duration>,
154}
155
156impl EventReceiverTrait<Arc<Event>> for Client {
157 fn get_receiver(&self) -> EventReceiver {
158 self.event_sender.subscribe()
159 }
160}
161
162impl Client {
163 pub fn new(
171 service: impl IntoService,
172 timeout: Option<Duration>,
173 api_channel_cap: Option<usize>,
174 event_channel_cap: Option<usize>,
175 ) -> Self {
176 let mut service = Box::new(service.into());
177 let (api_sender, api_receiver) = flume::bounded(api_channel_cap.unwrap_or(16));
178 let (event_sender, _) = broadcast::channel(event_channel_cap.unwrap_or(16));
179 service.inject(api_receiver.clone(), event_sender.clone());
180 Self {
181 service,
182 api_receiver,
183 api_sender,
184 event_sender,
185 timeout,
186 }
187 }
188}
189
190impl Client {
191 pub async fn start_service(&self) -> anyhow::Result<()> {
192 self.service.start_service().await
193 }
194
195 pub fn change_service(&mut self, service: impl IntoService) -> Box<dyn CommunicationService> {
196 let mut service = Box::new(service.into());
197 service.inject(self.api_receiver.clone(), self.event_sender.clone());
198 std::mem::replace(&mut self.service, service)
199 }
200
201 pub fn get_service(&self) -> &dyn CommunicationService {
202 &*self.service
203 }
204
205 pub fn get_service_mut(&mut self) -> &mut dyn CommunicationService {
206 &mut *self.service
207 }
208}
209
210impl Client {
211 pub fn generate_id() -> String {
212 uuid::Uuid::new_v4().to_string()
213 }
214
215 pub async fn get_response(&self, echo: String) -> Option<APIResponse> {
216 let mut receiver = self.get_receiver();
217 if let Event::APIResponse(res) = &*(receiver.listen(echo, self.timeout).await?) {
218 Some(res.clone())
219 } else {
220 None
221 }
222 }
223
224 pub fn parse_response<T: DeserializeOwned>(response: APIResponse) -> anyhow::Result<T> {
225 response.parse_data()
226 }
227
228 pub async fn send_request(
229 &self,
230 action: String,
231 params: JsonValue,
232 echo: String,
233 ) -> Result<(), SendError<APIRequest>> {
234 self
235 .api_sender
236 .send_async(APIRequest {
237 action,
238 params,
239 echo: Some(echo),
240 })
241 .await
242 }
243
244 pub async fn send_and_parse<T: DeserializeOwned>(
256 &self,
257 action: impl ToString,
258 params: JsonValue,
259 ) -> anyhow::Result<T> {
260 let echo = Self::generate_id();
261 self
262 .send_request(action.to_string(), params, echo.clone())
263 .await?;
264 let response = self.get_response(echo).await;
265 if response.is_none() {
266 return Err(anyhow::anyhow!("request time out"));
267 }
268 let response = response.unwrap();
269 Self::parse_response(response)
270 }
271}
272
273#[async_trait]
274impl APISenderTrait for Client {
275 async fn send_private_msg(
276 &self,
277 user_id: i64,
278 message: Vec<SendSegment>,
279 auto_escape: Option<bool>,
280 ) -> anyhow::Result<i32> {
281 let params = json!({
282 "user_id": user_id,
283 "message": message,
284 "auto_escape": auto_escape
285 });
286 let response: SendMsgResponse = self.send_and_parse("send_private_msg", params).await?;
287 Ok(response.message_id)
288 }
289
290 async fn send_group_msg(
291 &self,
292 group_id: i64,
293 message: Vec<SendSegment>,
294 auto_escape: Option<bool>,
295 ) -> anyhow::Result<i32> {
296 let params = json!({
297 "group_id": group_id,
298 "message": message,
299 "auto_escape": auto_escape
300 });
301 let response: SendMsgResponse = self.send_and_parse("send_group_msg", params).await?;
302 Ok(response.message_id)
303 }
304
305 async fn send_msg(
306 &self,
307 message_type: Option<MessageType>,
308 user_id: i64,
309 group_id: i64,
310 message: Vec<SendSegment>,
311 auto_escape: Option<bool>,
312 ) -> anyhow::Result<i32> {
313 let params = json!({
314 "message_type": message_type,
315 "user_id": user_id,
316 "group_id": group_id,
317 "message": message,
318 "auto_escape": auto_escape
319 });
320 let response: SendMsgResponse = self.send_and_parse("send_msg", params).await?;
321 Ok(response.message_id)
322 }
323
324 async fn delete_msg(&self, message_id: i32) -> anyhow::Result<()> {
325 let params = json!({
326 "message_id": message_id
327 });
328 self.send_and_parse("delete_msg", params).await
329 }
330
331 async fn get_msg(&self, message_id: i32) -> anyhow::Result<GetMsgResponse> {
332 let params = json!({
333 "message_id": message_id
334 });
335 self.send_and_parse("get_msg", params).await
336 }
337
338 async fn get_forward_msg(&self, id: String) -> anyhow::Result<Vec<ReceiveSegment>> {
339 let params = json!({
340 "id": id
341 });
342 let response: GetForwardMsgResponse = self.send_and_parse("get_forward_msg", params).await?;
343 Ok(response.message)
344 }
345
346 async fn send_like(&self, user_id: i64, times: Option<i32>) -> anyhow::Result<()> {
347 let params = json!({
348 "user_id": user_id,
349 "times": times
350 });
351 self.send_and_parse("send_like", params).await
352 }
353
354 async fn set_group_kick(
355 &self,
356 group_id: i32,
357 user_id: i32,
358 reject_add_request: Option<bool>,
359 ) -> anyhow::Result<()> {
360 let params = json!({
361 "group_id": group_id,
362 "user_id": user_id,
363 "reject_add_request": reject_add_request
364 });
365 self.send_and_parse("set_group_kick", params).await
366 }
367
368 async fn set_group_ban(
369 &self,
370 group_id: i32,
371 user_id: i32,
372 duration: Option<i32>,
373 ) -> anyhow::Result<()> {
374 let params = json!({
375 "group_id": group_id,
376 "user_id": user_id,
377 "duration": duration
378 });
379 self.send_and_parse("set_group_ban", params).await
380 }
381
382 async fn set_group_anonymous_ban(
383 &self,
384 group_id: i32,
385 anonymous: Option<GroupMessageAnonymous>,
386 flag: Option<String>,
387 duration: Option<i32>,
388 ) -> anyhow::Result<()> {
389 let params = json!({
390 "group_id": group_id,
391 "anonymous": anonymous,
392 "flag": flag,
393 "duration": duration
394 });
395 self.send_and_parse("set_group_anonymous_ban", params).await
396 }
397
398 async fn set_group_whole_ban(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
399 let params = json!({
400 "group_id": group_id,
401 "enable": enable
402 });
403 self.send_and_parse("set_group_whole_ban", params).await
404 }
405
406 async fn set_group_admin(
407 &self,
408 group_id: i32,
409 user_id: i32,
410 enable: Option<bool>,
411 ) -> anyhow::Result<()> {
412 let params = json!({
413 "group_id": group_id,
414 "user_id": user_id,
415 "enable": enable
416 });
417 self.send_and_parse("set_group_admin", params).await
418 }
419
420 async fn set_group_anonymous(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
421 let params = json!({
422 "group_id": group_id,
423 "enable": enable
424 });
425 self.send_and_parse("set_group_anonymous", params).await
426 }
427
428 async fn set_group_card(
429 &self,
430 group_id: i32,
431 user_id: i32,
432 card: Option<String>,
433 ) -> anyhow::Result<()> {
434 let params = json!({
435 "group_id": group_id,
436 "user_id": user_id,
437 "card": card
438 });
439 self.send_and_parse("set_group_card", params).await
440 }
441
442 async fn set_group_name(&self, group_id: i32, group_name: String) -> anyhow::Result<()> {
443 let params = json!({
444 "group_id": group_id,
445 "group_name": group_name
446 });
447 self.send_and_parse("set_group_name", params).await
448 }
449
450 async fn set_group_leave(&self, group_id: i32, is_dismiss: Option<bool>) -> anyhow::Result<()> {
451 let params = json!({
452 "group_id": group_id,
453 "is_dismiss": is_dismiss
454 });
455 self.send_and_parse("set_group_leave", params).await
456 }
457
458 async fn set_group_special_title(
459 &self,
460 group_id: i32,
461 user_id: i32,
462 special_title: Option<String>,
463 duration: Option<i32>,
464 ) -> anyhow::Result<()> {
465 let params = json!({
466 "group_id": group_id,
467 "user_id": user_id,
468 "special_title": special_title,
469 "duration": duration
470 });
471 self.send_and_parse("set_group_special_title", params).await
472 }
473
474 async fn set_friend_add_request(
475 &self,
476 flag: String,
477 approve: Option<bool>,
478 remark: Option<String>,
479 ) -> anyhow::Result<()> {
480 let params = json!({
481 "flag": flag,
482 "approve": approve,
483 "remark": remark
484 });
485 self.send_and_parse("set_friend_add_request", params).await
486 }
487
488 async fn set_group_add_request(
489 &self,
490 flag: String,
491 sub_type: String,
492 approve: Option<bool>,
493 reason: Option<String>,
494 ) -> anyhow::Result<()> {
495 let params = json!({
496 "flag": flag,
497 "sub_type": sub_type,
498 "approve": approve,
499 "reason": reason
500 });
501 self.send_and_parse("set_group_add_request", params).await
502 }
503
504 async fn get_login_info(&self) -> anyhow::Result<GetLoginInfoResponse> {
505 let params = json!({});
506 self.send_and_parse("get_login_info", params).await
507 }
508
509 async fn get_stranger_info(
510 &self,
511 user_id: i32,
512 no_cache: Option<bool>,
513 ) -> anyhow::Result<GetStrangerInfoResponse> {
514 let params = json!({
515 "user_id": user_id,
516 "no_cache": no_cache
517 });
518 self.send_and_parse("get_stranger_info", params).await
519 }
520
521 async fn get_friend_list(&self) -> anyhow::Result<Vec<GetFriendListResponse>> {
522 let params = json!({});
523 self.send_and_parse("get_friend_list", params).await
524 }
525
526 async fn get_group_info(
527 &self,
528 group_id: i32,
529 no_cache: Option<bool>,
530 ) -> anyhow::Result<GetGroupInfoResponse> {
531 let params = json!({
532 "group_id": group_id,
533 "no_cache": no_cache
534 });
535 self.send_and_parse("get_group_info", params).await
536 }
537
538 async fn get_group_list(&self) -> anyhow::Result<Vec<GetGroupInfoResponse>> {
539 let params = json!({});
540 self.send_and_parse("get_group_list", params).await
541 }
542
543 async fn get_group_member_info(
544 &self,
545 group_id: i32,
546 user_id: i32,
547 no_cache: Option<bool>,
548 ) -> anyhow::Result<GetGroupMemberInfoResponse> {
549 let params = json!({
550 "group_id": group_id,
551 "user_id": user_id,
552 "no_cache": no_cache
553 });
554 self.send_and_parse("get_group_member_info", params).await
555 }
556
557 async fn get_group_member_list(
558 &self,
559 group_id: i32,
560 ) -> anyhow::Result<Vec<GetGroupMemberInfoResponse>> {
561 let params = json!({
562 "group_id": group_id
563 });
564 self.send_and_parse("get_group_member_list", params).await
565 }
566
567 async fn get_group_honor_info(
568 &self,
569 group_id: i64,
570 honor_type: String,
571 ) -> anyhow::Result<GetGroupMemberInfoResponse> {
572 let params = json!({
573 "group_id": group_id,
574 "type": honor_type
575 });
576 self.send_and_parse("get_group_honor_info", params).await
577 }
578
579 async fn get_cookies(&self, domain: Option<String>) -> anyhow::Result<String> {
580 let params = json!({
581 "domain": domain
582 });
583 let response: GetCookiesResponse = self.send_and_parse("get_cookies", params).await?;
584 Ok(response.cookies)
585 }
586
587 async fn get_csrf_token(&self) -> anyhow::Result<i32> {
588 let params = json!({});
589 let response: GetCsrfTokenResponse = self.send_and_parse("get_csrf_token", params).await?;
590 Ok(response.token)
591 }
592
593 async fn get_credentials(
594 &self,
595 domain: Option<String>,
596 ) -> anyhow::Result<GetCredentialsResponse> {
597 let params = json!({
598 "domain": domain
599 });
600 self.send_and_parse("get_credentials", params).await
601 }
602
603 async fn get_record(&self, file: String, out_format: String) -> anyhow::Result<String> {
604 let params = json!({
605 "file": file,
606 "out_format": out_format
607 });
608 let response: GetDataResponse = self.send_and_parse("get_record", params).await?;
609 Ok(response.file)
610 }
611
612 async fn get_image(&self, file: String) -> anyhow::Result<String> {
613 let params = json!({
614 "file": file
615 });
616 let response: GetDataResponse = self.send_and_parse("get_image", params).await?;
617 Ok(response.file)
618 }
619
620 async fn can_send_image(&self) -> anyhow::Result<bool> {
621 let params = json!({});
622 let response: CanSendResponse = self.send_and_parse("can_send_image", params).await?;
623 Ok(response.yes)
624 }
625
626 async fn can_send_record(&self) -> anyhow::Result<bool> {
627 let params = json!({});
628 let response: CanSendResponse = self.send_and_parse("can_send_record", params).await?;
629 Ok(response.yes)
630 }
631
632 async fn get_status(&self) -> anyhow::Result<GetStatusResponse> {
633 let params = json!({});
634 self.send_and_parse("get_status", params).await
635 }
636
637 async fn get_version_info(&self) -> anyhow::Result<GetVersionInfoResponse> {
638 let params = json!({});
639 self.send_and_parse("get_version_info", params).await
640 }
641
642 async fn set_restart(&self, delay: Option<i32>) -> anyhow::Result<()> {
643 let params = json!({
644 "delay": delay
645 });
646 self.send_and_parse("set_restart", params).await
647 }
648
649 async fn clean_cache(&self) -> anyhow::Result<()> {
650 let params = json!({});
651 self.send_and_parse("clean_cache", params).await
652 }
653}