onebot_api/communication/
http.rs1use super::utils::*;
2use async_trait::async_trait;
3use reqwest::IntoUrl;
4use serde::Deserialize;
5use serde_json::Value as JsonValue;
6use std::sync::Arc;
7use tokio::select;
8use tokio::sync::broadcast;
9use url::Url;
10
11#[derive(Clone, Debug)]
12pub struct HttpService {
13 url: Url,
14 access_token: Option<String>,
15 api_receiver: Option<APIReceiver>,
16 event_sender: Option<EventSender>,
17 close_signal_sender: broadcast::Sender<()>,
18}
19
20#[derive(Deserialize, Debug, Clone)]
21pub struct HttpResponse {
22 status: String,
23 retcode: i32,
24 data: JsonValue,
25}
26
27impl HttpService {
28 pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
29 let (close_signal_sender, _) = broadcast::channel(1);
30 Ok(Self {
31 url: url.into_url()?,
32 access_token,
33 api_receiver: None,
34 event_sender: None,
35 close_signal_sender,
36 })
37 }
38
39 async fn api_processor(self) -> anyhow::Result<()> {
40 let mut close_signal = self.close_signal_sender.subscribe();
41 let api_receiver = self.api_receiver.clone().unwrap();
42 let event_sender = self.event_sender.clone().unwrap();
43 let client = reqwest::Client::new();
44
45 loop {
46 select! {
47 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
48 Ok(data) = api_receiver.recv_async() => {
49 let response = self.send_api_request(&client, &data).await;
50 if response.is_err() {
51 continue
52 }
53 let event = self.response_parser(data.echo, response?).await;
54 if event.is_err() {
55 continue
56 }
57 let _ = event_sender.send(Arc::new(event?));
58 }
59 }
60 }
61 }
62
63 pub async fn send_api_request(
64 &self,
65 client: &reqwest::Client,
66 api_request: &APIRequest,
67 ) -> anyhow::Result<reqwest::Response> {
68 let mut url = self.url.clone();
69 let mut path_segments = url
70 .path_segments_mut()
71 .map_err(|_| anyhow::anyhow!("URL is cannot-be-a-base"))?;
72 path_segments.push(&api_request.action);
73 drop(path_segments);
74 let mut post_req = client.post(url);
75 if let Some(token) = &self.access_token {
76 post_req = post_req.header("Authorization", "Bearer ".to_string() + token);
77 }
78 let res = post_req
79 .body(serde_json::to_string(&api_request.params)?)
80 .send()
81 .await?;
82 Ok(res)
83 }
84
85 pub async fn response_parser(
86 &self,
87 echo: Option<String>,
88 response: reqwest::Response,
89 ) -> anyhow::Result<Event> {
90 let status = response.status();
91 if !status.is_success() {
92 let res = APIResponse {
93 echo,
94 data: JsonValue::Null,
95 retcode: status.as_u16() as i32,
96 status: "failed".to_string(),
97 };
98 Ok(Event::APIResponse(res))
99 } else {
100 let http_res: HttpResponse = response.json().await?;
101 let res = APIResponse {
102 echo,
103 data: http_res.data,
104 status: http_res.status,
105 retcode: http_res.retcode,
106 };
107 Ok(Event::APIResponse(res))
108 }
109 }
110}
111
112impl Drop for HttpService {
113 fn drop(&mut self) {
114 let _ = self.close_signal_sender.send(());
115 }
116}
117
118#[async_trait]
119impl CommunicationService for HttpService {
120 fn inject(&mut self, api_receiver: APIReceiver, event_sender: EventSender) {
121 self.api_receiver = Some(api_receiver);
122 self.event_sender = Some(event_sender);
123 }
124
125 async fn start_service(&self) -> anyhow::Result<()> {
126 if self.api_receiver.is_none() || self.event_sender.is_none() {
127 return Err(anyhow::anyhow!("api receiver or event sender is none"));
128 }
129
130 tokio::spawn(Self::api_processor(self.clone()));
131
132 Ok(())
133 }
134}