Skip to main content

onebot_api/communication/
http.rs

1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use reqwest::IntoUrl;
5use serde::Deserialize;
6use serde_json::Value as JsonValue;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tokio::select;
10use tokio::sync::broadcast;
11use url::Url;
12
13#[derive(Clone, Debug)]
14pub struct HttpService {
15	url: Url,
16	access_token: Option<String>,
17	api_receiver: Option<InternalAPIReceiver>,
18	event_sender: Option<InternalEventSender>,
19	close_signal_sender: broadcast::Sender<()>,
20	is_running: Arc<AtomicBool>,
21}
22
23#[derive(Deserialize, Debug, Clone)]
24pub struct HttpResponse {
25	status: String,
26	retcode: i32,
27	data: JsonValue,
28}
29
30impl HttpService {
31	pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
32		let (close_signal_sender, _) = broadcast::channel(1);
33		Ok(Self {
34			url: url.into_url()?,
35			access_token,
36			api_receiver: None,
37			event_sender: None,
38			close_signal_sender,
39			is_running: Arc::new(AtomicBool::new(false)),
40		})
41	}
42
43	async fn api_processor(self) -> anyhow::Result<()> {
44		let mut close_signal = self.close_signal_sender.subscribe();
45		let api_receiver = self.api_receiver.clone().unwrap();
46		let event_sender = self.event_sender.clone().unwrap();
47		let client = reqwest::Client::new();
48
49		loop {
50			select! {
51				_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
52				Ok(data) = api_receiver.recv_async() => {
53					let response = self.send_api_request(&client, &data).await;
54					if response.is_err() {
55						continue
56					}
57					let event = self.response_parser(data.echo, response?).await;
58					if event.is_err() {
59						continue
60					}
61					let _ = event_sender.send_async(event?).await;
62				}
63			}
64		}
65	}
66
67	pub async fn send_api_request(
68		&self,
69		client: &reqwest::Client,
70		api_request: &APIRequest,
71	) -> anyhow::Result<reqwest::Response> {
72		let mut url = self.url.clone();
73		let mut path_segments = url
74			.path_segments_mut()
75			.map_err(|_| anyhow::anyhow!("URL is cannot-be-a-base"))?;
76		path_segments.push(&api_request.action);
77		drop(path_segments);
78		let mut post_req = client.post(url);
79		if let Some(token) = &self.access_token {
80			post_req = post_req.header("Authorization", "Bearer ".to_string() + token);
81		}
82		let res = post_req
83			.body(serde_json::to_string(&api_request.params)?)
84			.send()
85			.await?;
86		Ok(res)
87	}
88
89	pub async fn response_parser(
90		&self,
91		echo: Option<String>,
92		response: reqwest::Response,
93	) -> anyhow::Result<DeserializedEvent> {
94		let status = response.status();
95		if !status.is_success() {
96			let res = APIResponse {
97				echo,
98				data: JsonValue::Null,
99				retcode: status.as_u16() as i32,
100				status: "failed".to_string(),
101			};
102			Ok(DeserializedEvent::APIResponse(res))
103		} else {
104			let http_res: HttpResponse = response.json().await?;
105			let res = APIResponse {
106				echo,
107				data: http_res.data,
108				status: http_res.status,
109				retcode: http_res.retcode,
110			};
111			Ok(DeserializedEvent::APIResponse(res))
112		}
113	}
114}
115
116impl Drop for HttpService {
117	fn drop(&mut self) {
118		self.uninstall();
119	}
120}
121
122#[async_trait]
123impl CommunicationService for HttpService {
124	fn install(&mut self, api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
125		self.api_receiver = Some(api_receiver);
126		self.event_sender = Some(event_sender);
127	}
128
129	fn uninstall(&mut self) {
130		self.stop();
131		self.api_receiver = None;
132		self.event_sender = None;
133	}
134
135	fn stop(&self) {
136		let _ = self.close_signal_sender.send(());
137		self.is_running.store(false, Ordering::Relaxed);
138	}
139
140	async fn start(&self) -> ServiceStartResult<()> {
141		if self.is_running.load(Ordering::Relaxed) {
142			return Err(ServiceStartError::TaskIsRunning);
143		}
144
145		if self.api_receiver.is_none() && self.event_sender.is_none() {
146			return Err(ServiceStartError::NotInjected);
147		} else if self.event_sender.is_none() {
148			return Err(ServiceStartError::NotInjectedEventSender);
149		} else if self.api_receiver.is_none() {
150			return Err(ServiceStartError::NotInjectedAPIReceiver);
151		}
152
153		self.is_running.store(true, Ordering::Relaxed);
154		tokio::spawn(Self::api_processor(self.clone()));
155
156		Ok(())
157	}
158}