Skip to main content

onebot_api/communication/
utils.rs

1use crate::api::APISender as APISenderTrait;
2use crate::api::arg_type::MessageType;
3use crate::api::return_type::*;
4use crate::error::{APIRequestError, APIResult, ServiceStartResult};
5pub use crate::event::Event as NormalEvent;
6use crate::event::EventReceiver as EventReceiverTrait;
7use crate::event::EventTrait;
8use crate::event::message::GroupMessageAnonymous;
9use crate::message::receive_segment::ReceiveSegment;
10use crate::message::send_segment::SendSegment;
11use async_trait::async_trait;
12pub use flume::Receiver as FlumeReceiver;
13use flume::SendError;
14pub use flume::Sender as FlumeSender;
15use serde::de::DeserializeOwned;
16use serde::{Deserialize, Serialize};
17use serde_json::{Value as JsonValue, json};
18use std::sync::Arc;
19use std::time::Duration;
20use strum_macros::EnumIs;
21use tokio::sync::broadcast;
22pub use tokio::sync::broadcast::Receiver as BroadcastReceiver;
23pub use tokio::sync::broadcast::Sender as BroadcastSender;
24
25pub type APISender = FlumeSender<APIRequest>;
26pub type APIReceiver = FlumeReceiver<APIRequest>;
27pub type EventSender = BroadcastSender<Arc<Event>>;
28pub type EventReceiver = BroadcastReceiver<Arc<Event>>;
29
30#[derive(Serialize, Clone, Debug)]
31pub struct APIRequest {
32	pub action: String,
33	pub params: JsonValue,
34	pub echo: Option<String>,
35}
36
37#[derive(Deserialize, Clone, Debug)]
38pub struct APIResponse {
39	pub status: String,
40	pub retcode: i32,
41	pub data: JsonValue,
42	pub echo: Option<String>,
43}
44
45impl APIResponse {
46	pub fn verify(&self) -> bool {
47		self.status == "ok"
48	}
49
50	pub fn parse_data<T: DeserializeOwned>(self) -> APIResult<T> {
51		if !self.verify() {
52			return Err(APIRequestError::HttpError { code: self.retcode });
53		}
54		Ok(serde_json::from_value(self.data)?)
55	}
56}
57
58#[derive(Deserialize, Clone, Debug, EnumIs)]
59#[serde(untagged)]
60pub enum Event {
61	APIResponse(APIResponse),
62	Event(NormalEvent),
63}
64
65impl EventTrait for Event {}
66impl EventTrait for Arc<Event> {}
67
68#[async_trait]
69pub trait APIResponseListener {
70	async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>>;
71	async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>>;
72	async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>>;
73}
74
75#[async_trait]
76impl APIResponseListener for BroadcastReceiver<Arc<Event>> {
77	async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>> {
78		match timeout {
79			Some(timeout) => self.listen_with_timeout(echo, timeout).await,
80			None => self.listen_without_timeout(echo).await,
81		}
82	}
83	async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>> {
84		loop {
85			if let Ok(event) = self.recv().await // 获取Event
86				&& let Event::APIResponse(response) = &*event // 判断是否为APIResponse
87				&& response
88					.echo
89					.as_ref()
90					.map(|target_echo| target_echo == &echo) // 判断echo是否一致
91					.unwrap_or(false)
92			// 若APIResponse不存在echo则默认false
93			{
94				return Some(Arc::clone(&event));
95			}
96			if self.is_closed() {
97				return None;
98			}
99		}
100	}
101
102	async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>> {
103		tokio::time::timeout(timeout, self.listen_without_timeout(echo))
104			.await
105			.ok()?
106	}
107}
108
109#[async_trait]
110pub trait CommunicationService: Sync + Send {
111	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender);
112	async fn start_service(&self) -> ServiceStartResult<()>;
113}
114
115#[async_trait]
116impl CommunicationService for Box<dyn CommunicationService> {
117	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
118		(**self).inject(api_receiver, event_sender);
119	}
120
121	async fn start_service(&self) -> ServiceStartResult<()> {
122		(**self).start_service().await
123	}
124}
125
126pub trait IntoService {
127	fn into(self) -> impl CommunicationService + 'static;
128}
129
130impl<T: CommunicationService + 'static> IntoService for T {
131	fn into(self) -> impl CommunicationService + 'static {
132		self
133	}
134}
135
136/// 对于Onebot V11协议API调用和事件接收的高层抽象  
137/// 需要具体实现 [`CommunicationService`] 的底层服务支持
138///
139/// # Examples
140/// ```rust
141/// use std::time::Duration;
142/// use onebot_api::communication::utils::Client;
143/// use onebot_api::communication::ws::WsService;
144///
145/// let ws_service = WsService::new("wss://example.com", Some("example_token".to_string())).unwrap();
146/// let client = Client::new(ws_service, Duration::from_secs(5), None, None);
147/// client.start_service().await.unwrap();
148/// ```
149pub struct Client {
150	service: Box<dyn CommunicationService>,
151	api_sender: APISender,
152	api_receiver: APIReceiver,
153	event_sender: EventSender,
154	timeout: Option<Duration>,
155}
156
157impl EventReceiverTrait<Arc<Event>> for Client {
158	fn get_receiver(&self) -> EventReceiver {
159		self.event_sender.subscribe()
160	}
161}
162
163impl Client {
164	/// 创建一个 [`Client`] 实例
165	///
166	/// # Params
167	/// - `service` 实现 [`IntoService`] 特征或 [`CommunicationService`] 特征的对象
168	/// - `timeout` API请求超时时间,若为 `None` 则一直等待
169	/// - `api_channel_cap` API请求消息通道的容量,默认为`16`
170	/// - `event_channel_cap` Event消息通道的容量,默认为`16`
171	pub fn new(
172		service: impl IntoService,
173		timeout: Option<Duration>,
174		api_channel_cap: Option<usize>,
175		event_channel_cap: Option<usize>,
176	) -> Self {
177		let mut service = Box::new(service.into());
178		let (api_sender, api_receiver) = flume::bounded(api_channel_cap.unwrap_or(16));
179		let (event_sender, _) = broadcast::channel(event_channel_cap.unwrap_or(16));
180		service.inject(api_receiver.clone(), event_sender.clone());
181		Self {
182			service,
183			api_receiver,
184			api_sender,
185			event_sender,
186			timeout,
187		}
188	}
189}
190
191impl Client {
192	/// 启动服务
193	/// 在 `Client` 实例构造完成或调用 `change_service` 后都需要调用该方法启动服务
194	pub async fn start_service(&self) -> ServiceStartResult<()> {
195		self.service.start_service().await
196	}
197
198	/// 更换服务
199	/// 即使在原服务启动后也可以更换服务
200	/// 但需保证原服务被drop,即调用原服务的析构函数
201	///
202	/// # Examples
203	/// ```rust
204	/// use std::time::Duration;
205	/// use onebot_api::communication::utils::Client;
206	/// use onebot_api::communication::ws::WsService;
207	/// use onebot_api::communication::ws_reverse::WsReverseService;
208	///
209	/// let ws_service = WsService::new("wss://example.com", Some("example_token".to_string())).unwrap();
210	/// let mut client = Client::new(ws_service, Duration::from_secs(5), None, None);
211	/// client.start_service().await.unwrap();
212	///
213	/// let ws_reverse_service = WsReverseService::new("0.0.0.0:8080", Some("example_token".to_string()));
214	/// client.change_service(ws_reverse_service);
215	/// client.start_service().await.unwrap();
216	/// ```
217	pub fn change_service(&mut self, service: impl IntoService) -> Box<dyn CommunicationService> {
218		let mut service = Box::new(service.into());
219		service.inject(self.api_receiver.clone(), self.event_sender.clone());
220		std::mem::replace(&mut self.service, service)
221	}
222
223	/// 获取当前服务的引用
224	pub fn get_service(&self) -> &dyn CommunicationService {
225		&*self.service
226	}
227
228	/// 获取当前服务的可变引用
229	pub fn get_service_mut(&mut self) -> &mut dyn CommunicationService {
230		&mut *self.service
231	}
232}
233
234impl Client {
235	/// 随机生成uuid格式的id
236	/// 用于echo的生成
237	pub fn generate_id() -> String {
238		uuid::Uuid::new_v4().to_string()
239	}
240
241	/// 自动获取 `receiver` 并监听带有指定 `echo` 的API调用结果
242	///
243	/// # Returns
244	/// - `Some(api_response)` 成功获取API调用结果
245	/// - `None` 监听过程中事件通道被关闭或监听超时
246	pub async fn get_response(&self, echo: String) -> Option<APIResponse> {
247		let mut receiver = self.get_receiver();
248		if let Event::APIResponse(res) = &*(receiver.listen(echo, self.timeout).await?) {
249			Some(res.clone())
250		} else {
251			None
252		}
253	}
254
255	pub fn parse_response<T: DeserializeOwned>(response: APIResponse) -> APIResult<T> {
256		response.parse_data()
257	}
258
259	pub async fn send_request(
260		&self,
261		action: String,
262		params: JsonValue,
263		echo: String,
264	) -> Result<(), SendError<APIRequest>> {
265		self
266			.api_sender
267			.send_async(APIRequest {
268				action,
269				params,
270				echo: Some(echo),
271			})
272			.await
273	}
274
275	/// 生成echo并发送API请求  
276	/// 同时等待API响应并自动解析
277	///
278	/// # Params
279	/// - `action` 要调用的 `action` 的名称
280	/// - `params` 调用 `action` 所需要的参数
281	///
282	/// # Examples
283	/// ```rust
284	/// use std::time::Duration;
285	/// use serde_json::{json, Value};
286	/// use onebot_api::communication::utils::Client;
287	/// use onebot_api::communication::ws::WsService;
288	///
289	/// let client: Client = Client::new(WsService::new("ws://localhost:8080", None).unwrap(), Some(Duration::from_secs(5)), None, None);
290	/// let response: Value =  client.send_and_parse("send_like", json!({})).await.unwrap();
291	/// ```
292	pub async fn send_and_parse<T: DeserializeOwned>(
293		&self,
294		action: impl ToString,
295		params: JsonValue,
296	) -> APIResult<T> {
297		let echo = Self::generate_id();
298		self
299			.send_request(action.to_string(), params, echo.clone())
300			.await?;
301		let response = self.get_response(echo).await;
302		if response.is_none() {
303			return Err(APIRequestError::Timeout);
304		}
305		let response = response.unwrap();
306		Self::parse_response(response)
307	}
308}
309
310#[async_trait]
311impl APISenderTrait for Client {
312	async fn send_private_msg(
313		&self,
314		user_id: i64,
315		message: Vec<SendSegment>,
316		auto_escape: Option<bool>,
317	) -> APIResult<i32> {
318		let params = json!({
319			"user_id": user_id,
320			"message": message,
321			"auto_escape": auto_escape
322		});
323		let response: SendMsgResponse = self.send_and_parse("send_private_msg", params).await?;
324		Ok(response.message_id)
325	}
326
327	async fn send_group_msg(
328		&self,
329		group_id: i64,
330		message: Vec<SendSegment>,
331		auto_escape: Option<bool>,
332	) -> APIResult<i32> {
333		let params = json!({
334			"group_id": group_id,
335			"message": message,
336			"auto_escape": auto_escape
337		});
338		let response: SendMsgResponse = self.send_and_parse("send_group_msg", params).await?;
339		Ok(response.message_id)
340	}
341
342	async fn send_msg(
343		&self,
344		message_type: Option<MessageType>,
345		user_id: i64,
346		group_id: i64,
347		message: Vec<SendSegment>,
348		auto_escape: Option<bool>,
349	) -> APIResult<i32> {
350		let params = json!({
351			"message_type": message_type,
352			"user_id": user_id,
353			"group_id": group_id,
354			"message": message,
355			"auto_escape": auto_escape
356		});
357		let response: SendMsgResponse = self.send_and_parse("send_msg", params).await?;
358		Ok(response.message_id)
359	}
360
361	async fn delete_msg(&self, message_id: i32) -> APIResult<()> {
362		let params = json!({
363			"message_id": message_id
364		});
365		self.send_and_parse("delete_msg", params).await
366	}
367
368	async fn get_msg(&self, message_id: i32) -> APIResult<GetMsgResponse> {
369		let params = json!({
370			"message_id": message_id
371		});
372		self.send_and_parse("get_msg", params).await
373	}
374
375	async fn get_forward_msg(&self, id: String) -> APIResult<Vec<ReceiveSegment>> {
376		let params = json!({
377			"id": id
378		});
379		let response: GetForwardMsgResponse = self.send_and_parse("get_forward_msg", params).await?;
380		Ok(response.message)
381	}
382
383	async fn send_like(&self, user_id: i64, times: Option<i32>) -> APIResult<()> {
384		let params = json!({
385			"user_id": user_id,
386			"times": times
387		});
388		self.send_and_parse("send_like", params).await
389	}
390
391	async fn set_group_kick(
392		&self,
393		group_id: i32,
394		user_id: i32,
395		reject_add_request: Option<bool>,
396	) -> APIResult<()> {
397		let params = json!({
398			"group_id": group_id,
399			"user_id": user_id,
400			"reject_add_request": reject_add_request
401		});
402		self.send_and_parse("set_group_kick", params).await
403	}
404
405	async fn set_group_ban(
406		&self,
407		group_id: i32,
408		user_id: i32,
409		duration: Option<i32>,
410	) -> APIResult<()> {
411		let params = json!({
412			"group_id": group_id,
413			"user_id": user_id,
414			"duration": duration
415		});
416		self.send_and_parse("set_group_ban", params).await
417	}
418
419	async fn set_group_anonymous_ban(
420		&self,
421		group_id: i32,
422		anonymous: Option<GroupMessageAnonymous>,
423		flag: Option<String>,
424		duration: Option<i32>,
425	) -> APIResult<()> {
426		let params = json!({
427			"group_id": group_id,
428			"anonymous": anonymous,
429			"flag": flag,
430			"duration": duration
431		});
432		self.send_and_parse("set_group_anonymous_ban", params).await
433	}
434
435	async fn set_group_whole_ban(&self, group_id: i32, enable: Option<bool>) -> APIResult<()> {
436		let params = json!({
437			"group_id": group_id,
438			"enable": enable
439		});
440		self.send_and_parse("set_group_whole_ban", params).await
441	}
442
443	async fn set_group_admin(
444		&self,
445		group_id: i32,
446		user_id: i32,
447		enable: Option<bool>,
448	) -> APIResult<()> {
449		let params = json!({
450			"group_id": group_id,
451			"user_id": user_id,
452			"enable": enable
453		});
454		self.send_and_parse("set_group_admin", params).await
455	}
456
457	async fn set_group_anonymous(&self, group_id: i32, enable: Option<bool>) -> APIResult<()> {
458		let params = json!({
459			"group_id": group_id,
460			"enable": enable
461		});
462		self.send_and_parse("set_group_anonymous", params).await
463	}
464
465	async fn set_group_card(
466		&self,
467		group_id: i32,
468		user_id: i32,
469		card: Option<String>,
470	) -> APIResult<()> {
471		let params = json!({
472			"group_id": group_id,
473			"user_id": user_id,
474			"card": card
475		});
476		self.send_and_parse("set_group_card", params).await
477	}
478
479	async fn set_group_name(&self, group_id: i32, group_name: String) -> APIResult<()> {
480		let params = json!({
481			"group_id": group_id,
482			"group_name": group_name
483		});
484		self.send_and_parse("set_group_name", params).await
485	}
486
487	async fn set_group_leave(&self, group_id: i32, is_dismiss: Option<bool>) -> APIResult<()> {
488		let params = json!({
489			"group_id": group_id,
490			"is_dismiss": is_dismiss
491		});
492		self.send_and_parse("set_group_leave", params).await
493	}
494
495	async fn set_group_special_title(
496		&self,
497		group_id: i32,
498		user_id: i32,
499		special_title: Option<String>,
500		duration: Option<i32>,
501	) -> APIResult<()> {
502		let params = json!({
503			"group_id": group_id,
504			"user_id": user_id,
505			"special_title": special_title,
506			"duration": duration
507		});
508		self.send_and_parse("set_group_special_title", params).await
509	}
510
511	async fn set_friend_add_request(
512		&self,
513		flag: String,
514		approve: Option<bool>,
515		remark: Option<String>,
516	) -> APIResult<()> {
517		let params = json!({
518			"flag": flag,
519			"approve": approve,
520			"remark": remark
521		});
522		self.send_and_parse("set_friend_add_request", params).await
523	}
524
525	async fn set_group_add_request(
526		&self,
527		flag: String,
528		sub_type: String,
529		approve: Option<bool>,
530		reason: Option<String>,
531	) -> APIResult<()> {
532		let params = json!({
533			"flag": flag,
534			"sub_type": sub_type,
535			"approve": approve,
536			"reason": reason
537		});
538		self.send_and_parse("set_group_add_request", params).await
539	}
540
541	async fn get_login_info(&self) -> APIResult<GetLoginInfoResponse> {
542		let params = json!({});
543		self.send_and_parse("get_login_info", params).await
544	}
545
546	async fn get_stranger_info(
547		&self,
548		user_id: i32,
549		no_cache: Option<bool>,
550	) -> APIResult<GetStrangerInfoResponse> {
551		let params = json!({
552			"user_id": user_id,
553			"no_cache": no_cache
554		});
555		self.send_and_parse("get_stranger_info", params).await
556	}
557
558	async fn get_friend_list(&self) -> APIResult<Vec<GetFriendListResponse>> {
559		let params = json!({});
560		self.send_and_parse("get_friend_list", params).await
561	}
562
563	async fn get_group_info(
564		&self,
565		group_id: i32,
566		no_cache: Option<bool>,
567	) -> APIResult<GetGroupInfoResponse> {
568		let params = json!({
569			"group_id": group_id,
570			"no_cache": no_cache
571		});
572		self.send_and_parse("get_group_info", params).await
573	}
574
575	async fn get_group_list(&self) -> APIResult<Vec<GetGroupInfoResponse>> {
576		let params = json!({});
577		self.send_and_parse("get_group_list", params).await
578	}
579
580	async fn get_group_member_info(
581		&self,
582		group_id: i32,
583		user_id: i32,
584		no_cache: Option<bool>,
585	) -> APIResult<GetGroupMemberInfoResponse> {
586		let params = json!({
587			"group_id": group_id,
588			"user_id": user_id,
589			"no_cache": no_cache
590		});
591		self.send_and_parse("get_group_member_info", params).await
592	}
593
594	async fn get_group_member_list(
595		&self,
596		group_id: i32,
597	) -> APIResult<Vec<GetGroupMemberInfoResponse>> {
598		let params = json!({
599			"group_id": group_id
600		});
601		self.send_and_parse("get_group_member_list", params).await
602	}
603
604	async fn get_group_honor_info(
605		&self,
606		group_id: i64,
607		honor_type: String,
608	) -> APIResult<GetGroupMemberInfoResponse> {
609		let params = json!({
610			"group_id": group_id,
611			"type": honor_type
612		});
613		self.send_and_parse("get_group_honor_info", params).await
614	}
615
616	async fn get_cookies(&self, domain: Option<String>) -> APIResult<String> {
617		let params = json!({
618			"domain": domain
619		});
620		let response: GetCookiesResponse = self.send_and_parse("get_cookies", params).await?;
621		Ok(response.cookies)
622	}
623
624	async fn get_csrf_token(&self) -> APIResult<i32> {
625		let params = json!({});
626		let response: GetCsrfTokenResponse = self.send_and_parse("get_csrf_token", params).await?;
627		Ok(response.token)
628	}
629
630	async fn get_credentials(&self, domain: Option<String>) -> APIResult<GetCredentialsResponse> {
631		let params = json!({
632			"domain": domain
633		});
634		self.send_and_parse("get_credentials", params).await
635	}
636
637	async fn get_record(&self, file: String, out_format: String) -> APIResult<String> {
638		let params = json!({
639			"file": file,
640			"out_format": out_format
641		});
642		let response: GetDataResponse = self.send_and_parse("get_record", params).await?;
643		Ok(response.file)
644	}
645
646	async fn get_image(&self, file: String) -> APIResult<String> {
647		let params = json!({
648			"file": file
649		});
650		let response: GetDataResponse = self.send_and_parse("get_image", params).await?;
651		Ok(response.file)
652	}
653
654	async fn can_send_image(&self) -> APIResult<bool> {
655		let params = json!({});
656		let response: CanSendResponse = self.send_and_parse("can_send_image", params).await?;
657		Ok(response.yes)
658	}
659
660	async fn can_send_record(&self) -> APIResult<bool> {
661		let params = json!({});
662		let response: CanSendResponse = self.send_and_parse("can_send_record", params).await?;
663		Ok(response.yes)
664	}
665
666	async fn get_status(&self) -> APIResult<GetStatusResponse> {
667		let params = json!({});
668		self.send_and_parse("get_status", params).await
669	}
670
671	async fn get_version_info(&self) -> APIResult<GetVersionInfoResponse> {
672		let params = json!({});
673		self.send_and_parse("get_version_info", params).await
674	}
675
676	async fn set_restart(&self, delay: Option<i32>) -> APIResult<()> {
677		let params = json!({
678			"delay": delay
679		});
680		self.send_and_parse("set_restart", params).await
681	}
682
683	async fn clean_cache(&self) -> APIResult<()> {
684		let params = json!({});
685		self.send_and_parse("clean_cache", params).await
686	}
687}