onebot_api/communication/
http.rs1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use reqwest::IntoUrl;
5use serde::Deserialize;
6use serde_json::Value as JsonValue;
7use std::sync::Arc;
8use tokio::select;
9use tokio::sync::broadcast;
10use url::Url;
11
12#[derive(Clone, Debug)]
13pub struct HttpService {
14 url: Url,
15 access_token: Option<String>,
16 api_receiver: Option<APIReceiver>,
17 event_sender: Option<EventSender>,
18 close_signal_sender: broadcast::Sender<()>,
19}
20
21#[derive(Deserialize, Debug, Clone)]
22pub struct HttpResponse {
23 status: String,
24 retcode: i32,
25 data: JsonValue,
26}
27
28impl HttpService {
29 pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
30 let (close_signal_sender, _) = broadcast::channel(1);
31 Ok(Self {
32 url: url.into_url()?,
33 access_token,
34 api_receiver: None,
35 event_sender: None,
36 close_signal_sender,
37 })
38 }
39
40 async fn api_processor(self) -> anyhow::Result<()> {
41 let mut close_signal = self.close_signal_sender.subscribe();
42 let api_receiver = self.api_receiver.clone().unwrap();
43 let event_sender = self.event_sender.clone().unwrap();
44 let client = reqwest::Client::new();
45
46 loop {
47 select! {
48 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
49 Ok(data) = api_receiver.recv_async() => {
50 let response = self.send_api_request(&client, &data).await;
51 if response.is_err() {
52 continue
53 }
54 let event = self.response_parser(data.echo, response?).await;
55 if event.is_err() {
56 continue
57 }
58 let _ = event_sender.send(Arc::new(event?));
59 }
60 }
61 }
62 }
63
64 pub async fn send_api_request(
65 &self,
66 client: &reqwest::Client,
67 api_request: &APIRequest,
68 ) -> anyhow::Result<reqwest::Response> {
69 let mut url = self.url.clone();
70 let mut path_segments = url
71 .path_segments_mut()
72 .map_err(|_| anyhow::anyhow!("URL is cannot-be-a-base"))?;
73 path_segments.push(&api_request.action);
74 drop(path_segments);
75 let mut post_req = client.post(url);
76 if let Some(token) = &self.access_token {
77 post_req = post_req.header("Authorization", "Bearer ".to_string() + token);
78 }
79 let res = post_req
80 .body(serde_json::to_string(&api_request.params)?)
81 .send()
82 .await?;
83 Ok(res)
84 }
85
86 pub async fn response_parser(
87 &self,
88 echo: Option<String>,
89 response: reqwest::Response,
90 ) -> anyhow::Result<Event> {
91 let status = response.status();
92 if !status.is_success() {
93 let res = APIResponse {
94 echo,
95 data: JsonValue::Null,
96 retcode: status.as_u16() as i32,
97 status: "failed".to_string(),
98 };
99 Ok(Event::APIResponse(res))
100 } else {
101 let http_res: HttpResponse = response.json().await?;
102 let res = APIResponse {
103 echo,
104 data: http_res.data,
105 status: http_res.status,
106 retcode: http_res.retcode,
107 };
108 Ok(Event::APIResponse(res))
109 }
110 }
111}
112
113impl Drop for HttpService {
114 fn drop(&mut self) {
115 let _ = self.close_signal_sender.send(());
116 }
117}
118
119#[async_trait]
120impl CommunicationService for HttpService {
121 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
122 self.api_receiver = Some(api_receiver);
123 self.event_sender = Some(event_sender);
124 }
125
126 async fn start_service(&self) -> ServiceStartResult<()> {
127 if self.api_receiver.is_none() && self.event_sender.is_none() {
128 return Err(ServiceStartError::NotInjected);
129 } else if self.event_sender.is_none() {
130 return Err(ServiceStartError::NotInjectedEventSender);
131 } else if self.api_receiver.is_none() {
132 return Err(ServiceStartError::NotInjectedAPIReceiver);
133 }
134
135 tokio::spawn(Self::api_processor(self.clone()));
136
137 Ok(())
138 }
139}