Skip to main content

onebot_api/communication/
http_post.rs

1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use axum::Router;
5use axum::extract::State;
6use axum::response::IntoResponse;
7use axum::routing::any;
8use hmac::{Hmac, Mac};
9use http::{HeaderMap, StatusCode};
10use sha1::Sha1;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use tokio::net::{TcpListener, ToSocketAddrs};
14use tokio::sync::broadcast;
15
16type HmacSha1 = Hmac<Sha1>;
17
18pub struct HttpPostService<T: ToSocketAddrs + Clone + Send + Sync> {
19	addr: T,
20	hmac: Option<HmacSha1>,
21	event_sender: Option<InternalEventSender>,
22	close_signal_sender: broadcast::Sender<()>,
23	prefix: String,
24	is_running: Arc<AtomicBool>,
25}
26
27impl<T: ToSocketAddrs + Clone + Send + Sync> Drop for HttpPostService<T> {
28	fn drop(&mut self) {
29		self.uninstall();
30	}
31}
32
33impl<T: ToSocketAddrs + Clone + Send + Sync> HttpPostService<T> {
34	pub fn new(addr: T, prefix: Option<String>, secret: Option<String>) -> anyhow::Result<Self> {
35		let (close_signal_sender, _) = broadcast::channel(1);
36		let hmac = if let Some(secret) = secret {
37			Some(HmacSha1::new_from_slice(secret.as_ref())?)
38		} else {
39			None
40		};
41		let mut prefix = prefix.unwrap_or("/".to_string());
42		if !prefix.starts_with("/") {
43			prefix = "/".to_string() + &prefix;
44		}
45		Ok(Self {
46			addr,
47			hmac,
48			event_sender: None,
49			close_signal_sender,
50			prefix,
51			is_running: Arc::new(AtomicBool::new(false)),
52		})
53	}
54}
55
56struct AppState {
57	hmac: Option<HmacSha1>,
58	event_sender: InternalEventSender,
59}
60
61pub fn get_sig(mut hmac: HmacSha1, content: &[u8]) -> String {
62	hmac.update(content);
63	let result = hmac.finalize().into_bytes();
64	hex::encode(result)
65}
66
67async fn processor(
68	headers: HeaderMap,
69	State(state): State<Arc<AppState>>,
70	body: String,
71) -> impl IntoResponse {
72	if state.hmac.is_some() {
73		let received_sig = headers.get("X-Signature").map(|v| v.to_str().unwrap());
74		if received_sig.is_none() {
75			return StatusCode::UNAUTHORIZED;
76		}
77		let received_sig = received_sig.unwrap();
78		let hmac = state.hmac.clone().unwrap();
79		let sig = get_sig(hmac, body.as_ref());
80		if received_sig != "sha1=".to_string() + sig.as_str() {
81			return StatusCode::FORBIDDEN;
82		}
83	}
84	let event = serde_json::from_str(&body).unwrap();
85	let _ = state.event_sender.send_async(event).await;
86	StatusCode::NO_CONTENT
87}
88
89#[async_trait]
90impl<T: ToSocketAddrs + Clone + Send + Sync> CommunicationService for HttpPostService<T> {
91	fn install(&mut self, _api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
92		self.event_sender = Some(event_sender);
93	}
94
95	fn uninstall(&mut self) {
96		self.stop();
97		self.event_sender = None;
98	}
99
100	fn stop(&self) {
101		let _ = self.close_signal_sender.send(());
102		self.is_running.store(false, Ordering::Relaxed);
103	}
104
105	async fn start(&self) -> ServiceStartResult<()> {
106		if self.is_running.load(Ordering::Relaxed) {
107			return Err(ServiceStartError::TaskIsRunning);
108		}
109
110		if self.event_sender.is_none() {
111			return Err(ServiceStartError::NotInjectedEventSender);
112		}
113
114		let event_sender = self.event_sender.clone().unwrap();
115
116		let state = Arc::new(AppState {
117			event_sender,
118			hmac: self.hmac.clone(),
119		});
120
121		let listener = TcpListener::bind(self.addr.clone()).await?;
122		let router = Router::new()
123			.route(&self.prefix, any(processor))
124			.with_state(state);
125		let mut close_signal = self.close_signal_sender.subscribe();
126
127		self.is_running.store(true, Ordering::Relaxed);
128		tokio::spawn(
129			axum::serve(listener, router)
130				.with_graceful_shutdown(async move {
131					let _ = close_signal.recv().await;
132				})
133				.into_future(),
134		);
135
136		Ok(())
137	}
138}