Skip to main content

onebot_api/communication/
utils.rs

1use crate::api::APISender as APISenderTrait;
2use crate::api::arg_type::MessageType;
3use crate::api::return_type::*;
4pub use crate::event::Event as NormalEvent;
5use crate::event::EventReceiver as EventReceiverTrait;
6use crate::event::EventTrait;
7use crate::event::message::GroupMessageAnonymous;
8use crate::message::receive_segment::ReceiveSegment;
9use crate::message::send_segment::SendSegment;
10use async_trait::async_trait;
11pub use flume::Receiver as FlumeReceiver;
12use flume::SendError;
13pub use flume::Sender as FlumeSender;
14use serde::de::DeserializeOwned;
15use serde::{Deserialize, Serialize};
16use serde_json::{Value as JsonValue, json};
17use std::sync::Arc;
18use std::time::Duration;
19use strum_macros::EnumIs;
20use tokio::sync::broadcast;
21pub use tokio::sync::broadcast::Receiver as BroadcastReceiver;
22pub use tokio::sync::broadcast::Sender as BroadcastSender;
23
24pub type APISender = FlumeSender<APIRequest>;
25pub type APIReceiver = FlumeReceiver<APIRequest>;
26pub type EventSender = BroadcastSender<Arc<Event>>;
27pub type EventReceiver = BroadcastReceiver<Arc<Event>>;
28
29#[derive(Serialize, Clone, Debug)]
30pub struct APIRequest {
31	pub action: String,
32	pub params: JsonValue,
33	pub echo: Option<String>,
34}
35
36#[derive(Deserialize, Clone, Debug)]
37pub struct APIResponse {
38	pub status: String,
39	pub retcode: i32,
40	pub data: JsonValue,
41	pub echo: Option<String>,
42}
43
44impl APIResponse {
45	pub fn verify(&self) -> bool {
46		self.status == "ok"
47	}
48
49	pub fn parse_data<T: DeserializeOwned>(self) -> anyhow::Result<T> {
50		if !self.verify() {
51			return Err(anyhow::anyhow!("request failed with code {}", self.retcode));
52		}
53		Ok(serde_json::from_value(self.data)?)
54	}
55}
56
57#[derive(Deserialize, Clone, Debug, EnumIs)]
58#[serde(untagged)]
59pub enum Event {
60	APIResponse(APIResponse),
61	Event(NormalEvent),
62}
63
64impl EventTrait for Event {}
65impl EventTrait for Arc<Event> {}
66
67#[async_trait]
68pub trait APIResponseListener {
69	async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>>;
70	async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>>;
71	async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>>;
72}
73
74#[async_trait]
75impl APIResponseListener for BroadcastReceiver<Arc<Event>> {
76	async fn listen(&mut self, echo: String, timeout: Option<Duration>) -> Option<Arc<Event>> {
77		match timeout {
78			Some(timeout) => self.listen_with_timeout(echo, timeout).await,
79			None => self.listen_without_timeout(echo).await,
80		}
81	}
82	async fn listen_without_timeout(&mut self, echo: String) -> Option<Arc<Event>> {
83		loop {
84			if let Ok(event) = self.recv().await // 获取Event
85				&& let Event::APIResponse(response) = &*event // 判断是否为APIResponse
86				&& response
87					.echo
88					.as_ref()
89					.map(|target_echo| target_echo == &echo) // 判断echo是否一致
90					.unwrap_or(false)
91			// 若APIResponse不存在echo则默认false
92			{
93				return Some(Arc::clone(&event));
94			}
95			if self.is_closed() {
96				return None;
97			}
98		}
99	}
100
101	async fn listen_with_timeout(&mut self, echo: String, timeout: Duration) -> Option<Arc<Event>> {
102		tokio::time::timeout(timeout, self.listen_without_timeout(echo))
103			.await
104			.ok()?
105	}
106}
107
108#[async_trait]
109pub trait CommunicationService: Sync + Send {
110	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender);
111	async fn start_service(&self) -> anyhow::Result<()>;
112}
113
114pub trait IntoService {
115	fn into(self) -> impl CommunicationService + 'static;
116}
117
118impl<T: CommunicationService + 'static> IntoService for T {
119	fn into(self) -> impl CommunicationService + 'static {
120		self
121	}
122}
123
124/// 对于Onebot V11协议API调用和事件接收的高层抽象  
125/// 需要具体实现 [`CommunicationService`] 的底层服务支持
126///
127/// # Examples
128/// ```rust
129/// use std::time::Duration;
130/// use onebot_api::communication::utils::Client;
131/// use onebot_api::communication::ws::WsService;
132///
133/// let ws_service = WsService::new("wss://example.com", Some("example_token".to_string())).unwrap();
134/// let client = Client::new(ws_service, Duration::from_secs(5), None, None);
135/// client.start_service().await.unwrap();
136/// ```
137pub struct Client {
138	service: Box<dyn CommunicationService>,
139	api_sender: APISender,
140	api_receiver: APIReceiver,
141	event_sender: EventSender,
142	timeout: Option<Duration>,
143}
144
145impl EventReceiverTrait<Arc<Event>> for Client {
146	fn get_receiver(&self) -> EventReceiver {
147		self.event_sender.subscribe()
148	}
149}
150
151impl Client {
152	/// 创建一个 [`Client`] 实例
153	///
154	/// # Params
155	/// - `service` 实现 [`IntoService`] 特征或 [`CommunicationService`] 特征的对象
156	/// - `timeout` API请求超时时间,若为 `None` 则一直等待
157	/// - `api_channel_cap` API请求消息通道的容量,默认为`16`
158	/// - `event_channel_cap` Event消息通道的容量,默认为`16`
159	pub fn new(
160		service: impl IntoService,
161		timeout: Option<Duration>,
162		api_channel_cap: Option<usize>,
163		event_channel_cap: Option<usize>,
164	) -> Self {
165		let mut service = Box::new(service.into());
166		let (api_sender, api_receiver) = flume::bounded(api_channel_cap.unwrap_or(16));
167		let (event_sender, _) = broadcast::channel(event_channel_cap.unwrap_or(16));
168		service.inject(api_receiver.clone(), event_sender.clone());
169		Self {
170			service,
171			api_receiver,
172			api_sender,
173			event_sender,
174			timeout,
175		}
176	}
177}
178
179impl Client {
180	pub async fn start_service(&self) -> anyhow::Result<()> {
181		self.service.start_service().await
182	}
183
184	pub fn change_service(&mut self, service: impl IntoService) -> Box<dyn CommunicationService> {
185		let mut service = Box::new(service.into());
186		service.inject(self.api_receiver.clone(), self.event_sender.clone());
187		std::mem::replace(&mut self.service, service)
188	}
189
190	pub fn get_service(&self) -> &dyn CommunicationService {
191		&*self.service
192	}
193
194	pub fn get_service_mut(&mut self) -> &mut dyn CommunicationService {
195		&mut *self.service
196	}
197}
198
199impl Client {
200	pub fn generate_id() -> String {
201		uuid::Uuid::new_v4().to_string()
202	}
203
204	pub async fn get_response(&self, echo: String) -> Option<APIResponse> {
205		let mut receiver = self.get_receiver();
206		if let Event::APIResponse(res) = &*(receiver.listen(echo, self.timeout).await?) {
207			Some(res.clone())
208		} else {
209			None
210		}
211	}
212
213	pub fn parse_response<T: DeserializeOwned>(response: APIResponse) -> anyhow::Result<T> {
214		response.parse_data()
215	}
216
217	pub async fn send_request(
218		&self,
219		action: String,
220		params: JsonValue,
221		echo: String,
222	) -> Result<(), SendError<APIRequest>> {
223		self
224			.api_sender
225			.send_async(APIRequest {
226				action,
227				params,
228				echo: Some(echo),
229			})
230			.await
231	}
232
233	/// 生成echo并发送API请求  
234	/// 同时等待API响应并自动解析
235	///
236	/// # Examples
237	/// ```rust
238	/// use serde_json::{json, Value};
239	/// use onebot_api::communication::utils::Client;
240	///
241	/// let client: Client = /* ... */;
242	/// let response: Value =  client.send_and_parse("action_name", json!({})).await.unwrap();
243	/// ```
244	pub async fn send_and_parse<T: DeserializeOwned>(
245		&self,
246		action: impl ToString,
247		params: JsonValue,
248	) -> anyhow::Result<T> {
249		let echo = Self::generate_id();
250		self
251			.send_request(action.to_string(), params, echo.clone())
252			.await?;
253		let response = self.get_response(echo).await;
254		if response.is_none() {
255			return Err(anyhow::anyhow!("request time out"));
256		}
257		let response = response.unwrap();
258		Self::parse_response(response)
259	}
260}
261
262#[async_trait]
263impl APISenderTrait for Client {
264	async fn send_private_msg(
265		&self,
266		user_id: i64,
267		message: Vec<SendSegment>,
268		auto_escape: Option<bool>,
269	) -> anyhow::Result<i32> {
270		let params = json!({
271			"user_id": user_id,
272			"message": message,
273			"auto_escape": auto_escape
274		});
275		let response: SendMsgResponse = self.send_and_parse("send_private_msg", params).await?;
276		Ok(response.message_id)
277	}
278
279	async fn send_group_msg(
280		&self,
281		group_id: i64,
282		message: Vec<SendSegment>,
283		auto_escape: Option<bool>,
284	) -> anyhow::Result<i32> {
285		let params = json!({
286			"group_id": group_id,
287			"message": message,
288			"auto_escape": auto_escape
289		});
290		let response: SendMsgResponse = self.send_and_parse("send_group_msg", params).await?;
291		Ok(response.message_id)
292	}
293
294	async fn send_msg(
295		&self,
296		message_type: Option<MessageType>,
297		user_id: i64,
298		group_id: i64,
299		message: Vec<SendSegment>,
300		auto_escape: Option<bool>,
301	) -> anyhow::Result<i32> {
302		let params = json!({
303			"message_type": message_type,
304			"user_id": user_id,
305			"group_id": group_id,
306			"message": message,
307			"auto_escape": auto_escape
308		});
309		let response: SendMsgResponse = self.send_and_parse("send_msg", params).await?;
310		Ok(response.message_id)
311	}
312
313	async fn delete_msg(&self, message_id: i32) -> anyhow::Result<()> {
314		let params = json!({
315			"message_id": message_id
316		});
317		self.send_and_parse("delete_msg", params).await
318	}
319
320	async fn get_msg(&self, message_id: i32) -> anyhow::Result<GetMsgResponse> {
321		let params = json!({
322			"message_id": message_id
323		});
324		self.send_and_parse("get_msg", params).await
325	}
326
327	async fn get_forward_msg(&self, id: String) -> anyhow::Result<Vec<ReceiveSegment>> {
328		let params = json!({
329			"id": id
330		});
331		let response: GetForwardMsgResponse = self.send_and_parse("get_forward_msg", params).await?;
332		Ok(response.message)
333	}
334
335	async fn send_like(&self, user_id: i64, times: Option<i32>) -> anyhow::Result<()> {
336		let params = json!({
337			"user_id": user_id,
338			"times": times
339		});
340		self.send_and_parse("send_like", params).await
341	}
342
343	async fn set_group_kick(
344		&self,
345		group_id: i32,
346		user_id: i32,
347		reject_add_request: Option<bool>,
348	) -> anyhow::Result<()> {
349		let params = json!({
350			"group_id": group_id,
351			"user_id": user_id,
352			"reject_add_request": reject_add_request
353		});
354		self.send_and_parse("set_group_kick", params).await
355	}
356
357	async fn set_group_ban(
358		&self,
359		group_id: i32,
360		user_id: i32,
361		duration: Option<i32>,
362	) -> anyhow::Result<()> {
363		let params = json!({
364			"group_id": group_id,
365			"user_id": user_id,
366			"duration": duration
367		});
368		self.send_and_parse("set_group_ban", params).await
369	}
370
371	async fn set_group_anonymous_ban(
372		&self,
373		group_id: i32,
374		anonymous: Option<GroupMessageAnonymous>,
375		flag: Option<String>,
376		duration: Option<i32>,
377	) -> anyhow::Result<()> {
378		let params = json!({
379			"group_id": group_id,
380			"anonymous": anonymous,
381			"flag": flag,
382			"duration": duration
383		});
384		self.send_and_parse("set_group_anonymous_ban", params).await
385	}
386
387	async fn set_group_whole_ban(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
388		let params = json!({
389			"group_id": group_id,
390			"enable": enable
391		});
392		self.send_and_parse("set_group_whole_ban", params).await
393	}
394
395	async fn set_group_admin(
396		&self,
397		group_id: i32,
398		user_id: i32,
399		enable: Option<bool>,
400	) -> anyhow::Result<()> {
401		let params = json!({
402			"group_id": group_id,
403			"user_id": user_id,
404			"enable": enable
405		});
406		self.send_and_parse("set_group_admin", params).await
407	}
408
409	async fn set_group_anonymous(&self, group_id: i32, enable: Option<bool>) -> anyhow::Result<()> {
410		let params = json!({
411			"group_id": group_id,
412			"enable": enable
413		});
414		self.send_and_parse("set_group_anonymous", params).await
415	}
416
417	async fn set_group_card(
418		&self,
419		group_id: i32,
420		user_id: i32,
421		card: Option<String>,
422	) -> anyhow::Result<()> {
423		let params = json!({
424			"group_id": group_id,
425			"user_id": user_id,
426			"card": card
427		});
428		self.send_and_parse("set_group_card", params).await
429	}
430
431	async fn set_group_name(&self, group_id: i32, group_name: String) -> anyhow::Result<()> {
432		let params = json!({
433			"group_id": group_id,
434			"group_name": group_name
435		});
436		self.send_and_parse("set_group_name", params).await
437	}
438
439	async fn set_group_leave(&self, group_id: i32, is_dismiss: Option<bool>) -> anyhow::Result<()> {
440		let params = json!({
441			"group_id": group_id,
442			"is_dismiss": is_dismiss
443		});
444		self.send_and_parse("set_group_leave", params).await
445	}
446
447	async fn set_group_special_title(
448		&self,
449		group_id: i32,
450		user_id: i32,
451		special_title: Option<String>,
452		duration: Option<i32>,
453	) -> anyhow::Result<()> {
454		let params = json!({
455			"group_id": group_id,
456			"user_id": user_id,
457			"special_title": special_title,
458			"duration": duration
459		});
460		self.send_and_parse("set_group_special_title", params).await
461	}
462
463	async fn set_friend_add_request(
464		&self,
465		flag: String,
466		approve: Option<bool>,
467		remark: Option<String>,
468	) -> anyhow::Result<()> {
469		let params = json!({
470			"flag": flag,
471			"approve": approve,
472			"remark": remark
473		});
474		self.send_and_parse("set_friend_add_request", params).await
475	}
476
477	async fn set_group_add_request(
478		&self,
479		flag: String,
480		sub_type: String,
481		approve: Option<bool>,
482		reason: Option<String>,
483	) -> anyhow::Result<()> {
484		let params = json!({
485			"flag": flag,
486			"sub_type": sub_type,
487			"approve": approve,
488			"reason": reason
489		});
490		self.send_and_parse("set_group_add_request", params).await
491	}
492
493	async fn get_login_info(&self) -> anyhow::Result<GetLoginInfoResponse> {
494		let params = json!({});
495		self.send_and_parse("get_login_info", params).await
496	}
497
498	async fn get_stranger_info(
499		&self,
500		user_id: i32,
501		no_cache: Option<bool>,
502	) -> anyhow::Result<GetStrangerInfoResponse> {
503		let params = json!({
504			"user_id": user_id,
505			"no_cache": no_cache
506		});
507		self.send_and_parse("get_stranger_info", params).await
508	}
509
510	async fn get_friend_list(&self) -> anyhow::Result<Vec<GetFriendListResponse>> {
511		let params = json!({});
512		self.send_and_parse("get_friend_list", params).await
513	}
514
515	async fn get_group_info(
516		&self,
517		group_id: i32,
518		no_cache: Option<bool>,
519	) -> anyhow::Result<GetGroupInfoResponse> {
520		let params = json!({
521			"group_id": group_id,
522			"no_cache": no_cache
523		});
524		self.send_and_parse("get_group_info", params).await
525	}
526
527	async fn get_group_list(&self) -> anyhow::Result<Vec<GetGroupInfoResponse>> {
528		let params = json!({});
529		self.send_and_parse("get_group_list", params).await
530	}
531
532	async fn get_group_member_info(
533		&self,
534		group_id: i32,
535		user_id: i32,
536		no_cache: Option<bool>,
537	) -> anyhow::Result<GetGroupMemberInfoResponse> {
538		let params = json!({
539			"group_id": group_id,
540			"user_id": user_id,
541			"no_cache": no_cache
542		});
543		self.send_and_parse("get_group_member_info", params).await
544	}
545
546	async fn get_group_member_list(
547		&self,
548		group_id: i32,
549	) -> anyhow::Result<Vec<GetGroupMemberInfoResponse>> {
550		let params = json!({
551			"group_id": group_id
552		});
553		self.send_and_parse("get_group_member_list", params).await
554	}
555
556	async fn get_group_honor_info(
557		&self,
558		group_id: i64,
559		honor_type: String,
560	) -> anyhow::Result<GetGroupMemberInfoResponse> {
561		let params = json!({
562			"group_id": group_id,
563			"type": honor_type
564		});
565		self.send_and_parse("get_group_honor_info", params).await
566	}
567
568	async fn get_cookies(&self, domain: Option<String>) -> anyhow::Result<String> {
569		let params = json!({
570			"domain": domain
571		});
572		let response: GetCookiesResponse = self.send_and_parse("get_cookies", params).await?;
573		Ok(response.cookies)
574	}
575
576	async fn get_csrf_token(&self) -> anyhow::Result<i32> {
577		let params = json!({});
578		let response: GetCsrfTokenResponse = self.send_and_parse("get_csrf_token", params).await?;
579		Ok(response.token)
580	}
581
582	async fn get_credentials(
583		&self,
584		domain: Option<String>,
585	) -> anyhow::Result<GetCredentialsResponse> {
586		let params = json!({
587			"domain": domain
588		});
589		self.send_and_parse("get_credentials", params).await
590	}
591
592	async fn get_record(&self, file: String, out_format: String) -> anyhow::Result<String> {
593		let params = json!({
594			"file": file,
595			"out_format": out_format
596		});
597		let response: GetDataResponse = self.send_and_parse("get_record", params).await?;
598		Ok(response.file)
599	}
600
601	async fn get_image(&self, file: String) -> anyhow::Result<String> {
602		let params = json!({
603			"file": file
604		});
605		let response: GetDataResponse = self.send_and_parse("get_image", params).await?;
606		Ok(response.file)
607	}
608
609	async fn can_send_image(&self) -> anyhow::Result<bool> {
610		let params = json!({});
611		let response: CanSendResponse = self.send_and_parse("can_send_image", params).await?;
612		Ok(response.yes)
613	}
614
615	async fn can_send_record(&self) -> anyhow::Result<bool> {
616		let params = json!({});
617		let response: CanSendResponse = self.send_and_parse("can_send_record", params).await?;
618		Ok(response.yes)
619	}
620
621	async fn get_status(&self) -> anyhow::Result<GetStatusResponse> {
622		let params = json!({});
623		self.send_and_parse("get_status", params).await
624	}
625
626	async fn get_version_info(&self) -> anyhow::Result<GetVersionInfoResponse> {
627		let params = json!({});
628		self.send_and_parse("get_version_info", params).await
629	}
630
631	async fn set_restart(&self, delay: Option<i32>) -> anyhow::Result<()> {
632		let params = json!({
633			"delay": delay
634		});
635		self.send_and_parse("set_restart", params).await
636	}
637
638	async fn clean_cache(&self) -> anyhow::Result<()> {
639		let params = json!({});
640		self.send_and_parse("clean_cache", params).await
641	}
642}