onebot_api/communication/
http.rs1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use reqwest::IntoUrl;
5use serde::Deserialize;
6use serde_json::Value as JsonValue;
7use std::sync::Arc;
8use std::sync::atomic::{AtomicBool, Ordering};
9use tokio::select;
10use tokio::sync::broadcast;
11use url::Url;
12
13#[derive(Clone, Debug)]
14pub struct HttpService {
15 url: Url,
16 access_token: Option<String>,
17 api_receiver: Option<InternalAPIReceiver>,
18 event_sender: Option<InternalEventSender>,
19 close_signal_sender: broadcast::Sender<()>,
20 is_running: Arc<AtomicBool>,
21}
22
23#[derive(Deserialize, Debug, Clone)]
24pub struct HttpResponse {
25 status: String,
26 retcode: i32,
27 data: JsonValue,
28}
29
30impl HttpService {
31 pub fn new(url: impl IntoUrl, access_token: Option<String>) -> reqwest::Result<Self> {
32 let (close_signal_sender, _) = broadcast::channel(1);
33 Ok(Self {
34 url: url.into_url()?,
35 access_token,
36 api_receiver: None,
37 event_sender: None,
38 close_signal_sender,
39 is_running: Arc::new(AtomicBool::new(false)),
40 })
41 }
42
43 async fn api_processor(self) -> anyhow::Result<()> {
44 let mut close_signal = self.close_signal_sender.subscribe();
45 let api_receiver = self.api_receiver.clone().unwrap();
46 let event_sender = self.event_sender.clone().unwrap();
47 let client = reqwest::Client::new();
48
49 loop {
50 select! {
51 _ = close_signal.recv() => return Err(anyhow::anyhow!("close")),
52 Ok(data) = api_receiver.recv_async() => {
53 let response = self.send_api_request(&client, &data).await;
54 if response.is_err() {
55 continue
56 }
57 let event = self.response_parser(data.echo, response?).await;
58 if event.is_err() {
59 continue
60 }
61 let _ = event_sender.send_async(event?).await;
62 }
63 }
64 }
65 }
66
67 pub async fn send_api_request(
68 &self,
69 client: &reqwest::Client,
70 api_request: &APIRequest,
71 ) -> anyhow::Result<reqwest::Response> {
72 let mut url = self.url.clone();
73 let mut path_segments = url
74 .path_segments_mut()
75 .map_err(|_| anyhow::anyhow!("URL is cannot-be-a-base"))?;
76 path_segments.push(&api_request.action);
77 drop(path_segments);
78 let mut post_req = client.post(url);
79 if let Some(token) = &self.access_token {
80 post_req = post_req.header("Authorization", "Bearer ".to_string() + token);
81 }
82 let res = post_req
83 .body(serde_json::to_string(&api_request.params)?)
84 .send()
85 .await?;
86 Ok(res)
87 }
88
89 pub async fn response_parser(
90 &self,
91 echo: Option<String>,
92 response: reqwest::Response,
93 ) -> anyhow::Result<DeserializedEvent> {
94 let status = response.status();
95 if !status.is_success() {
96 let res = APIResponse {
97 echo,
98 data: JsonValue::Null,
99 retcode: status.as_u16() as i32,
100 status: "failed".to_string(),
101 };
102 Ok(DeserializedEvent::APIResponse(res))
103 } else {
104 let http_res: HttpResponse = response.json().await?;
105 let res = APIResponse {
106 echo,
107 data: http_res.data,
108 status: http_res.status,
109 retcode: http_res.retcode,
110 };
111 Ok(DeserializedEvent::APIResponse(res))
112 }
113 }
114}
115
116impl Drop for HttpService {
117 fn drop(&mut self) {
118 self.uninstall();
119 }
120}
121
122#[async_trait]
123impl CommunicationService for HttpService {
124 fn install(&mut self, api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
125 self.api_receiver = Some(api_receiver);
126 self.event_sender = Some(event_sender);
127 }
128
129 fn uninstall(&mut self) {
130 self.stop();
131 self.api_receiver = None;
132 self.event_sender = None;
133 }
134
135 fn stop(&self) {
136 let _ = self.close_signal_sender.send(());
137 self.is_running.store(false, Ordering::Relaxed);
138 }
139
140 async fn start(&self) -> ServiceStartResult<()> {
141 if self.is_running.load(Ordering::Relaxed) {
142 return Err(ServiceStartError::TaskIsRunning);
143 }
144
145 if self.api_receiver.is_none() && self.event_sender.is_none() {
146 return Err(ServiceStartError::NotInjected);
147 } else if self.event_sender.is_none() {
148 return Err(ServiceStartError::NotInjectedEventSender);
149 } else if self.api_receiver.is_none() {
150 return Err(ServiceStartError::NotInjectedAPIReceiver);
151 }
152
153 self.is_running.store(true, Ordering::Relaxed);
154 tokio::spawn(Self::api_processor(self.clone()));
155
156 Ok(())
157 }
158}