Skip to main content

onebot_api/communication/
http.rs

1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use reqwest::IntoUrl;
5use serde::Deserialize;
6use serde_json::Value as JsonValue;
7use std::sync::Arc;
8use tokio::select;
9use tokio::sync::broadcast;
10use url::Url;
11
12#[derive(Clone, Debug)]
13pub struct HttpService {
14	url: Url,
15	access_token: Option<String>,
16	api_receiver: Option<APIReceiver>,
17	event_sender: Option<EventSender>,
18	close_signal_sender: broadcast::Sender<()>,
19}
20
21#[derive(Deserialize, Debug, Clone)]
22pub struct HttpResponse {
23	status: String,
24	retcode: i32,
25	data: JsonValue,
26}
27
28impl HttpService {
29	pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
30		let (close_signal_sender, _) = broadcast::channel(1);
31		Ok(Self {
32			url: url.into_url()?,
33			access_token,
34			api_receiver: None,
35			event_sender: None,
36			close_signal_sender,
37		})
38	}
39
40	async fn api_processor(self) -> anyhow::Result<()> {
41		let mut close_signal = self.close_signal_sender.subscribe();
42		let api_receiver = self.api_receiver.clone().unwrap();
43		let event_sender = self.event_sender.clone().unwrap();
44		let client = reqwest::Client::new();
45
46		loop {
47			select! {
48				_ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
49				Ok(data) = api_receiver.recv_async() => {
50					let response = self.send_api_request(&client, &data).await;
51					if response.is_err() {
52						continue
53					}
54					let event = self.response_parser(data.echo, response?).await;
55					if event.is_err() {
56						continue
57					}
58					let _ = event_sender.send(Arc::new(event?));
59				}
60			}
61		}
62	}
63
64	pub async fn send_api_request(
65		&self,
66		client: &reqwest::Client,
67		api_request: &APIRequest,
68	) -> anyhow::Result<reqwest::Response> {
69		let mut url = self.url.clone();
70		let mut path_segments = url
71			.path_segments_mut()
72			.map_err(|_| anyhow::anyhow!("URL is cannot-be-a-base"))?;
73		path_segments.push(&api_request.action);
74		drop(path_segments);
75		let mut post_req = client.post(url);
76		if let Some(token) = &self.access_token {
77			post_req = post_req.header("Authorization", "Bearer ".to_string() + token);
78		}
79		let res = post_req
80			.body(serde_json::to_string(&api_request.params)?)
81			.send()
82			.await?;
83		Ok(res)
84	}
85
86	pub async fn response_parser(
87		&self,
88		echo: Option<String>,
89		response: reqwest::Response,
90	) -> anyhow::Result<Event> {
91		let status = response.status();
92		if !status.is_success() {
93			let res = APIResponse {
94				echo,
95				data: JsonValue::Null,
96				retcode: status.as_u16() as i32,
97				status: "failed".to_string(),
98			};
99			Ok(Event::APIResponse(res))
100		} else {
101			let http_res: HttpResponse = response.json().await?;
102			let res = APIResponse {
103				echo,
104				data: http_res.data,
105				status: http_res.status,
106				retcode: http_res.retcode,
107			};
108			Ok(Event::APIResponse(res))
109		}
110	}
111}
112
113impl Drop for HttpService {
114	fn drop(&mut self) {
115		let _ = self.close_signal_sender.send(());
116	}
117}
118
119#[async_trait]
120impl CommunicationService for HttpService {
121	fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
122		self.api_receiver = Some(api_receiver);
123		self.event_sender = Some(event_sender);
124	}
125
126	async fn start_service(&self) -> ServiceStartResult<()> {
127		if self.api_receiver.is_none() && self.event_sender.is_none() {
128			return Err(ServiceStartError::NotInjected);
129		} else if self.event_sender.is_none() {
130			return Err(ServiceStartError::NotInjectedEventSender);
131		} else if self.api_receiver.is_none() {
132			return Err(ServiceStartError::NotInjectedAPIReceiver);
133		}
134
135		tokio::spawn(Self::api_processor(self.clone()));
136
137		Ok(())
138	}
139}