Skip to main content

onebot_api/communication/
utils.rs

1use crate::api::APISender as APISenderTrait;
2use crate::api::arg_type::MessageType;
3use crate::api::return_type::*;
4pub use crate::event::Event as NormalEvent;
5use crate::event::EventReceiver as EventReceiverTrait;
6use crate::event::EventTrait;
7use crate::event::message::GroupMessageAnonymous;
8use crate::message::receive_segment::ReceiveSegment;
9use crate::message::send_segment::SendSegment;
10use async_trait::async_trait;
11pub use flume::Receiver as FlumeReceiver;
12use flume::SendError;
13pub use flume::Sender as FlumeSender;
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value as JsonValue, json};
17use std::sync::Arc;
18use std::time::Duration;
19use strum_macros::EnumIs;
20use tokio::sync::broadcast;
21pub use tokio::sync::broadcast::Receiver as BroadcastReceiver;
22pub use tokio::sync::broadcast::Sender as BroadcastSender;
23
24pub type APISender = FlumeSender<APIRequest>;
25pub type APIReceiver = FlumeReceiver<APIRequest>;
26pub type EventSender = BroadcastSender<Arc<Event>>;
27pub type EventReceiver = BroadcastReceiver<Arc<Event>>;
28
29#[derive(Serialize, Clone, Debug)]
30pub struct APIRequest {
31	pub action: String,
32	pub params: JsonValue,
33	pub echo: Option<String>,
34}
35
36#[derive(Deserialize, Clone, Debug)]
37pub struct APIResponse {
38	pub status: String,
39	pub retcode: i32,
40	pub data: JsonValue,
41	pub echo: Option<String>,
42}
43
44impl APIResponse {
45	pub fn verify(&self) -> bool {
46		self.status == "ok"
47	}
48
49	pub fn parse_data<T: DeserializeOwned>(self) -> anyhow::Result<T> {
50		if !self.verify() {
51			return Err(anyhow::anyhow!("request failed with code {}", self.retcode));
52		}
53		Ok(serde_json::from_value(self.data)?)
54	}
55}
56
57#[derive(Deserialize, Clone, Debug, EnumIs)]
58#[serde(untagged)]
59pub enum Event {
60	APIResponse(APIResponse),
61	Event(NormalEvent),
62}
63
64impl EventTrait for Event {}
65impl EventTrait for Arc<Event> {}
66
67#[async_trait]
68pub trait APIResponseListener {
69	async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>>;
70	async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>>;
71	async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>>;
72}
73
74#[async_trait]
75impl APIResponseListener for BroadcastReceiver<Arc<Event>> {
76	async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>> {
77		match timeout {
78			Some(timeout) => self.listen_with_timeout(echo, timeout).await,
79			None => self.listen_without_timeout(echo).await,
80		}
81	}
82	async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>> {
83		loop {
84			if let Ok(event) = self.recv().await // 获取Event
85				&& let Event::APIResponse(response) = &*event // 判断是否为APIResponse
86				&& response
87					.echo
88					.as_ref()
89					.map(|target_echo| target_echo == &echo) // 判断echo是否一致
90					.unwrap_or(false)
91			// 若APIResponse不存在echo则默认false
92			{
93				return Some(Arc::clone(&event));
94			}
95			if self.is_closed() {
96				return None;
97			}
98		}
99	}
100
101	async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>> {
102		tokio::time::timeout(timeout, self.listen_without_timeout(echo))
103			.await
104			.ok()?
105	}
106}
107
108#[async_trait]
109pub trait CommunicationService: Sync + Send {
110	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender);
111	async fn start_service(&self) -> anyhow::Result<()>;
112}
113
114#[async_trait]
115impl CommunicationService for Box<dyn CommunicationService> {
116	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
117		(**self).inject(api_receiver, event_sender);
118	}
119
120	async fn start_service(&self) -> anyhow::Result<()> {
121		(**self).start_service().await
122	}
123}
124
125pub trait IntoService {
126	fn into(self) -> impl CommunicationService + 'static;
127}
128
129impl<T: CommunicationService + 'static> IntoService for T {
130	fn into(self) -> impl CommunicationService + 'static {
131		self
132	}
133}
134
135/// 对于Onebot V11协议API调用和事件接收的高层抽象  
136/// 需要具体实现 [`CommunicationService`] 的底层服务支持
137///
138/// # Examples
139/// ```rust
140/// use std::time::Duration;
141/// use onebot_api::communication::utils::Client;
142/// use onebot_api::communication::ws::WsService;
143///
144/// let ws_service = WsService::new("wss://example.com", Some("example_token".to_string())).unwrap();
145/// let client = Client::new(ws_service, Duration::from_secs(5), None, None);
146/// client.start_service().await.unwrap();
147/// ```
148pub struct Client {
149	service: Box<dyn CommunicationService>,
150	api_sender: APISender,
151	api_receiver: APIReceiver,
152	event_sender: EventSender,
153	timeout: Option<Duration>,
154}
155
156impl EventReceiverTrait<Arc<Event>> for Client {
157	fn get_receiver(&self) -> EventReceiver {
158		self.event_sender.subscribe()
159	}
160}
161
162impl Client {
163	/// 创建一个 [`Client`] 实例
164	///
165	/// # Params
166	/// - `service` 实现 [`IntoService`] 特征或 [`CommunicationService`] 特征的对象
167	/// - `timeout` API请求超时时间,若为 `None` 则一直等待
168	/// - `api_channel_cap` API请求消息通道的容量,默认为`16`
169	/// - `event_channel_cap` Event消息通道的容量,默认为`16`
170	pub fn new(
171		service: impl IntoService,
172		timeout: Option<Duration>,
173		api_channel_cap: Option<usize>,
174		event_channel_cap: Option<usize>,
175	) -> Self {
176		let mut service = Box::new(service.into());
177		let (api_sender, api_receiver) = flume::bounded(api_channel_cap.unwrap_or(16));
178		let (event_sender, _) = broadcast::channel(event_channel_cap.unwrap_or(16));
179		service.inject(api_receiver.clone(), event_sender.clone());
180		Self {
181			service,
182			api_receiver,
183			api_sender,
184			event_sender,
185			timeout,
186		}
187	}
188}
189
190impl Client {
191	pub async fn start_service(&self) -> anyhow::Result<()> {
192		self.service.start_service().await
193	}
194
195	pub fn change_service(&mut self, service: impl IntoService) -> Box<dyn CommunicationService> {
196		let mut service = Box::new(service.into());
197		service.inject(self.api_receiver.clone(), self.event_sender.clone());
198		std::mem::replace(&mut self.service, service)
199	}
200
201	pub fn get_service(&self) -> &dyn CommunicationService {
202		&*self.service
203	}
204
205	pub fn get_service_mut(&mut self) -> &mut dyn CommunicationService {
206		&mut *self.service
207	}
208}
209
210impl Client {
211	pub fn generate_id() -> String {
212		uuid::Uuid::new_v4().to_string()
213	}
214
215	pub async fn get_response(&self, echo: String) -> Option<APIResponse> {
216		let mut receiver = self.get_receiver();
217		if let Event::APIResponse(res) = &*(receiver.listen(echo, self.timeout).await?) {
218			Some(res.clone())
219		} else {
220			None
221		}
222	}
223
224	pub fn parse_response<T: DeserializeOwned>(response: APIResponse) -> anyhow::Result<T> {
225		response.parse_data()
226	}
227
228	pub async fn send_request(
229		&self,
230		action: String,
231		params: JsonValue,
232		echo: String,
233	) -> Result<(), SendError<APIRequest>> {
234		self
235			.api_sender
236			.send_async(APIRequest {
237				action,
238				params,
239				echo: Some(echo),
240			})
241			.await
242	}
243
244	/// 生成echo并发送API请求  
245	/// 同时等待API响应并自动解析
246	///
247	/// # Examples
248	/// ```rust
249	/// use serde_json::{json, Value};
250	/// use onebot_api::communication::utils::Client;
251	///
252	/// let client: Client = /* ... */;
253	/// let response: Value =  client.send_and_parse("action_name", json!({})).await.unwrap();
254	/// ```
255	pub async fn send_and_parse<T: DeserializeOwned>(
256		&self,
257		action: impl ToString,
258		params: JsonValue,
259	) -> anyhow::Result<T> {
260		let echo = Self::generate_id();
261		self
262			.send_request(action.to_string(), params, echo.clone())
263			.await?;
264		let response = self.get_response(echo).await;
265		if response.is_none() {
266			return Err(anyhow::anyhow!("request time out"));
267		}
268		let response = response.unwrap();
269		Self::parse_response(response)
270	}
271}
272
273#[async_trait]
274impl APISenderTrait for Client {
275	async fn send_private_msg(
276		&self,
277		user_id: i64,
278		message: Vec<SendSegment>,
279		auto_escape: Option<bool>,
280	) -> anyhow::Result<i32> {
281		let params = json!({
282			"user_id": user_id,
283			"message": message,
284			"auto_escape": auto_escape
285		});
286		let response: SendMsgResponse = self.send_and_parse("send_private_msg", params).await?;
287		Ok(response.message_id)
288	}
289
290	async fn send_group_msg(
291		&self,
292		group_id: i64,
293		message: Vec<SendSegment>,
294		auto_escape: Option<bool>,
295	) -> anyhow::Result<i32> {
296		let params = json!({
297			"group_id": group_id,
298			"message": message,
299			"auto_escape": auto_escape
300		});
301		let response: SendMsgResponse = self.send_and_parse("send_group_msg", params).await?;
302		Ok(response.message_id)
303	}
304
305	async fn send_msg(
306		&self,
307		message_type: Option<MessageType>,
308		user_id: i64,
309		group_id: i64,
310		message: Vec<SendSegment>,
311		auto_escape: Option<bool>,
312	) -> anyhow::Result<i32> {
313		let params = json!({
314			"message_type": message_type,
315			"user_id": user_id,
316			"group_id": group_id,
317			"message": message,
318			"auto_escape": auto_escape
319		});
320		let response: SendMsgResponse = self.send_and_parse("send_msg", params).await?;
321		Ok(response.message_id)
322	}
323
324	async fn delete_msg(&self, message_id: i32) -> anyhow::Result<()> {
325		let params = json!({
326			"message_id": message_id
327		});
328		self.send_and_parse("delete_msg", params).await
329	}
330
331	async fn get_msg(&self, message_id: i32) -> anyhow::Result<GetMsgResponse> {
332		let params = json!({
333			"message_id": message_id
334		});
335		self.send_and_parse("get_msg", params).await
336	}
337
338	async fn get_forward_msg(&self, id: String) -> anyhow::Result<Vec<ReceiveSegment>> {
339		let params = json!({
340			"id": id
341		});
342		let response: GetForwardMsgResponse = self.send_and_parse("get_forward_msg", params).await?;
343		Ok(response.message)
344	}
345
346	async fn send_like(&self, user_id: i64, times: Option<i32>) -> anyhow::Result<()> {
347		let params = json!({
348			"user_id": user_id,
349			"times": times
350		});
351		self.send_and_parse("send_like", params).await
352	}
353
354	async fn set_group_kick(
355		&self,
356		group_id: i32,
357		user_id: i32,
358		reject_add_request: Option<bool>,
359	) -> anyhow::Result<()> {
360		let params = json!({
361			"group_id": group_id,
362			"user_id": user_id,
363			"reject_add_request": reject_add_request
364		});
365		self.send_and_parse("set_group_kick", params).await
366	}
367
368	async fn set_group_ban(
369		&self,
370		group_id: i32,
371		user_id: i32,
372		duration: Option<i32>,
373	) -> anyhow::Result<()> {
374		let params = json!({
375			"group_id": group_id,
376			"user_id": user_id,
377			"duration": duration
378		});
379		self.send_and_parse("set_group_ban", params).await
380	}
381
382	async fn set_group_anonymous_ban(
383		&self,
384		group_id: i32,
385		anonymous: Option<GroupMessageAnonymous>,
386		flag: Option<String>,
387		duration: Option<i32>,
388	) -> anyhow::Result<()> {
389		let params = json!({
390			"group_id": group_id,
391			"anonymous": anonymous,
392			"flag": flag,
393			"duration": duration
394		});
395		self.send_and_parse("set_group_anonymous_ban", params).await
396	}
397
398	async fn set_group_whole_ban(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
399		let params = json!({
400			"group_id": group_id,
401			"enable": enable
402		});
403		self.send_and_parse("set_group_whole_ban", params).await
404	}
405
406	async fn set_group_admin(
407		&self,
408		group_id: i32,
409		user_id: i32,
410		enable: Option<bool>,
411	) -> anyhow::Result<()> {
412		let params = json!({
413			"group_id": group_id,
414			"user_id": user_id,
415			"enable": enable
416		});
417		self.send_and_parse("set_group_admin", params).await
418	}
419
420	async fn set_group_anonymous(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
421		let params = json!({
422			"group_id": group_id,
423			"enable": enable
424		});
425		self.send_and_parse("set_group_anonymous", params).await
426	}
427
428	async fn set_group_card(
429		&self,
430		group_id: i32,
431		user_id: i32,
432		card: Option<String>,
433	) -> anyhow::Result<()> {
434		let params = json!({
435			"group_id": group_id,
436			"user_id": user_id,
437			"card": card
438		});
439		self.send_and_parse("set_group_card", params).await
440	}
441
442	async fn set_group_name(&self, group_id: i32, group_name: String) -> anyhow::Result<()> {
443		let params = json!({
444			"group_id": group_id,
445			"group_name": group_name
446		});
447		self.send_and_parse("set_group_name", params).await
448	}
449
450	async fn set_group_leave(&self, group_id: i32, is_dismiss: Option<bool>) -> anyhow::Result<()> {
451		let params = json!({
452			"group_id": group_id,
453			"is_dismiss": is_dismiss
454		});
455		self.send_and_parse("set_group_leave", params).await
456	}
457
458	async fn set_group_special_title(
459		&self,
460		group_id: i32,
461		user_id: i32,
462		special_title: Option<String>,
463		duration: Option<i32>,
464	) -> anyhow::Result<()> {
465		let params = json!({
466			"group_id": group_id,
467			"user_id": user_id,
468			"special_title": special_title,
469			"duration": duration
470		});
471		self.send_and_parse("set_group_special_title", params).await
472	}
473
474	async fn set_friend_add_request(
475		&self,
476		flag: String,
477		approve: Option<bool>,
478		remark: Option<String>,
479	) -> anyhow::Result<()> {
480		let params = json!({
481			"flag": flag,
482			"approve": approve,
483			"remark": remark
484		});
485		self.send_and_parse("set_friend_add_request", params).await
486	}
487
488	async fn set_group_add_request(
489		&self,
490		flag: String,
491		sub_type: String,
492		approve: Option<bool>,
493		reason: Option<String>,
494	) -> anyhow::Result<()> {
495		let params = json!({
496			"flag": flag,
497			"sub_type": sub_type,
498			"approve": approve,
499			"reason": reason
500		});
501		self.send_and_parse("set_group_add_request", params).await
502	}
503
504	async fn get_login_info(&self) -> anyhow::Result<GetLoginInfoResponse> {
505		let params = json!({});
506		self.send_and_parse("get_login_info", params).await
507	}
508
509	async fn get_stranger_info(
510		&self,
511		user_id: i32,
512		no_cache: Option<bool>,
513	) -> anyhow::Result<GetStrangerInfoResponse> {
514		let params = json!({
515			"user_id": user_id,
516			"no_cache": no_cache
517		});
518		self.send_and_parse("get_stranger_info", params).await
519	}
520
521	async fn get_friend_list(&self) -> anyhow::Result<Vec<GetFriendListResponse>> {
522		let params = json!({});
523		self.send_and_parse("get_friend_list", params).await
524	}
525
526	async fn get_group_info(
527		&self,
528		group_id: i32,
529		no_cache: Option<bool>,
530	) -> anyhow::Result<GetGroupInfoResponse> {
531		let params = json!({
532			"group_id": group_id,
533			"no_cache": no_cache
534		});
535		self.send_and_parse("get_group_info", params).await
536	}
537
538	async fn get_group_list(&self) -> anyhow::Result<Vec<GetGroupInfoResponse>> {
539		let params = json!({});
540		self.send_and_parse("get_group_list", params).await
541	}
542
543	async fn get_group_member_info(
544		&self,
545		group_id: i32,
546		user_id: i32,
547		no_cache: Option<bool>,
548	) -> anyhow::Result<GetGroupMemberInfoResponse> {
549		let params = json!({
550			"group_id": group_id,
551			"user_id": user_id,
552			"no_cache": no_cache
553		});
554		self.send_and_parse("get_group_member_info", params).await
555	}
556
557	async fn get_group_member_list(
558		&self,
559		group_id: i32,
560	) -> anyhow::Result<Vec<GetGroupMemberInfoResponse>> {
561		let params = json!({
562			"group_id": group_id
563		});
564		self.send_and_parse("get_group_member_list", params).await
565	}
566
567	async fn get_group_honor_info(
568		&self,
569		group_id: i64,
570		honor_type: String,
571	) -> anyhow::Result<GetGroupMemberInfoResponse> {
572		let params = json!({
573			"group_id": group_id,
574			"type": honor_type
575		});
576		self.send_and_parse("get_group_honor_info", params).await
577	}
578
579	async fn get_cookies(&self, domain: Option<String>) -> anyhow::Result<String> {
580		let params = json!({
581			"domain": domain
582		});
583		let response: GetCookiesResponse = self.send_and_parse("get_cookies", params).await?;
584		Ok(response.cookies)
585	}
586
587	async fn get_csrf_token(&self) -> anyhow::Result<i32> {
588		let params = json!({});
589		let response: GetCsrfTokenResponse = self.send_and_parse("get_csrf_token", params).await?;
590		Ok(response.token)
591	}
592
593	async fn get_credentials(
594		&self,
595		domain: Option<String>,
596	) -> anyhow::Result<GetCredentialsResponse> {
597		let params = json!({
598			"domain": domain
599		});
600		self.send_and_parse("get_credentials", params).await
601	}
602
603	async fn get_record(&self, file: String, out_format: String) -> anyhow::Result<String> {
604		let params = json!({
605			"file": file,
606			"out_format": out_format
607		});
608		let response: GetDataResponse = self.send_and_parse("get_record", params).await?;
609		Ok(response.file)
610	}
611
612	async fn get_image(&self, file: String) -> anyhow::Result<String> {
613		let params = json!({
614			"file": file
615		});
616		let response: GetDataResponse = self.send_and_parse("get_image", params).await?;
617		Ok(response.file)
618	}
619
620	async fn can_send_image(&self) -> anyhow::Result<bool> {
621		let params = json!({});
622		let response: CanSendResponse = self.send_and_parse("can_send_image", params).await?;
623		Ok(response.yes)
624	}
625
626	async fn can_send_record(&self) -> anyhow::Result<bool> {
627		let params = json!({});
628		let response: CanSendResponse = self.send_and_parse("can_send_record", params).await?;
629		Ok(response.yes)
630	}
631
632	async fn get_status(&self) -> anyhow::Result<GetStatusResponse> {
633		let params = json!({});
634		self.send_and_parse("get_status", params).await
635	}
636
637	async fn get_version_info(&self) -> anyhow::Result<GetVersionInfoResponse> {
638		let params = json!({});
639		self.send_and_parse("get_version_info", params).await
640	}
641
642	async fn set_restart(&self, delay: Option<i32>) -> anyhow::Result<()> {
643		let params = json!({
644			"delay": delay
645		});
646		self.send_and_parse("set_restart", params).await
647	}
648
649	async fn clean_cache(&self) -> anyhow::Result<()> {
650		let params = json!({});
651		self.send_and_parse("clean_cache", params).await
652	}
653}