Skip to main content

onebot_api/communication/
http.rs

1use super::utils::*;
2use async_trait::async_trait;
3use reqwest::IntoUrl;
4use serde::Deserialize;
5use serde_json::Value as JsonValue;
6use std::sync::Arc;
7use tokio::select;
8use tokio::sync::broadcast;
9use url::Url;
10
11#[derive(Clone, Debug)]
12pub struct HttpService {
13	url: Url,
14	access_token: Option<String>,
15	api_receiver: Option<APIReceiver>,
16	event_sender: Option<EventSender>,
17	close_signal_sender: broadcast::Sender<()>,
18}
19
20#[derive(Deserialize, Debug, Clone)]
21pub struct HttpResponse {
22	status: String,
23	retcode: i32,
24	data: JsonValue,
25}
26
27impl HttpService {
28	pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
29		let (close_signal_sender, _) = broadcast::channel(1);
30		Ok(Self {
31			url: url.into_url()?,
32			access_token,
33			api_receiver: None,
34			event_sender: None,
35			close_signal_sender,
36		})
37	}
38
39	async fn api_processor(self) -> anyhow::Result<()> {
40		let mut close_signal = self.close_signal_sender.subscribe();
41		let api_receiver = self.api_receiver.clone().unwrap();
42		let event_sender = self.event_sender.clone().unwrap();
43		let client = reqwest::Client::new();
44
45		loop {
46			select! {
47				_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
48				Ok(data) = api_receiver.recv_async() => {
49					let response = self.send_api_request(&client, &data).await;
50					if response.is_err() {
51						continue
52					}
53					let event = self.response_parser(data.echo, response?).await;
54					if event.is_err() {
55						continue
56					}
57					let _ = event_sender.send(Arc::new(event?));
58				}
59			}
60		}
61	}
62
63	pub async fn send_api_request(
64		&self,
65		client: &reqwest::Client,
66		api_request: &APIRequest,
67	) -> anyhow::Result<reqwest::Response> {
68		let mut url = self.url.clone();
69		let mut path_segments = url
70			.path_segments_mut()
71			.map_err(|_| anyhow::anyhow!("URL is cannot-be-a-base"))?;
72		path_segments.push(&api_request.action);
73		drop(path_segments);
74		let mut post_req = client.post(url);
75		if let Some(token) = &self.access_token {
76			post_req = post_req.header("Authorization", "Bearer ".to_string() + token);
77		}
78		let res = post_req
79			.body(serde_json::to_string(&api_request.params)?)
80			.send()
81			.await?;
82		Ok(res)
83	}
84
85	pub async fn response_parser(
86		&self,
87		echo: Option<String>,
88		response: reqwest::Response,
89	) -> anyhow::Result<Event> {
90		let status = response.status();
91		if !status.is_success() {
92			let res = APIResponse {
93				echo,
94				data: JsonValue::Null,
95				retcode: status.as_u16() as i32,
96				status: "failed".to_string(),
97			};
98			Ok(Event::APIResponse(res))
99		} else {
100			let http_res: HttpResponse = response.json().await?;
101			let res = APIResponse {
102				echo,
103				data: http_res.data,
104				status: http_res.status,
105				retcode: http_res.retcode,
106			};
107			Ok(Event::APIResponse(res))
108		}
109	}
110}
111
112impl Drop for HttpService {
113	fn drop(&mut self) {
114		let _ = self.close_signal_sender.send(());
115	}
116}
117
118#[async_trait]
119impl CommunicationService for HttpService {
120	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
121		self.api_receiver = Some(api_receiver);
122		self.event_sender = Some(event_sender);
123	}
124
125	async fn start_service(&self) -> anyhow::Result<()> {
126		if self.api_receiver.is_none() || self.event_sender.is_none() {
127			return Err(anyhow::anyhow!("api receiver or event sender is none"));
128		}
129
130		tokio::spawn(Self::api_processor(self.clone()));
131
132		Ok(())
133	}
134}