onebot_api/communication/
http_post.rs1use super::utils::*;
2use crate::error::{ServiceStartError, ServiceStartResult};
3use async_trait::async_trait;
4use axum::Router;
5use axum::extract::State;
6use axum::response::IntoResponse;
7use axum::routing::any;
8use hmac::{Hmac, Mac};
9use http::{HeaderMap, StatusCode};
10use sha1::Sha1;
11use std::sync::Arc;
12use std::sync::atomic::{AtomicBool, Ordering};
13use tokio::net::{TcpListener, ToSocketAddrs};
14use tokio::sync::broadcast;
15
16type HmacSha1 = Hmac<Sha1>;
17
18pub struct HttpPostService<T: ToSocketAddrs + Clone + Send + Sync> {
19 addr: T,
20 hmac: Option<HmacSha1>,
21 event_sender: Option<InternalEventSender>,
22 close_signal_sender: broadcast::Sender<()>,
23 prefix: String,
24 is_running: Arc<AtomicBool>,
25}
26
27impl<T: ToSocketAddrs + Clone + Send + Sync> Drop for HttpPostService<T> {
28 fn drop(&mut self) {
29 self.uninstall();
30 }
31}
32
33impl<T: ToSocketAddrs + Clone + Send + Sync> HttpPostService<T> {
34 pub fn new(addr: T, prefix: Option<String>, secret: Option<String>) -> anyhow::Result<Self> {
35 let (close_signal_sender, _) = broadcast::channel(1);
36 let hmac = if let Some(secret) = secret {
37 Some(HmacSha1::new_from_slice(secret.as_ref())?)
38 } else {
39 None
40 };
41 let mut prefix = prefix.unwrap_or("/".to_string());
42 if !prefix.starts_with("/") {
43 prefix = "/".to_string() + &prefix;
44 }
45 Ok(Self {
46 addr,
47 hmac,
48 event_sender: None,
49 close_signal_sender,
50 prefix,
51 is_running: Arc::new(AtomicBool::new(false)),
52 })
53 }
54}
55
56struct AppState {
57 hmac: Option<HmacSha1>,
58 event_sender: InternalEventSender,
59}
60
61pub fn get_sig(mut hmac: HmacSha1, content: &[u8]) -> String {
62 hmac.update(content);
63 let result = hmac.finalize().into_bytes();
64 hex::encode(result)
65}
66
67async fn processor(
68 headers: HeaderMap,
69 State(state): State<Arc<AppState>>,
70 body: String,
71) -> impl IntoResponse {
72 if state.hmac.is_some() {
73 let received_sig = headers.get("X-Signature").map(|v| v.to_str().unwrap());
74 if received_sig.is_none() {
75 return StatusCode::UNAUTHORIZED;
76 }
77 let received_sig = received_sig.unwrap();
78 let hmac = state.hmac.clone().unwrap();
79 let sig = get_sig(hmac, body.as_ref());
80 if received_sig != "sha1=".to_string() + sig.as_str() {
81 return StatusCode::FORBIDDEN;
82 }
83 }
84 let event = serde_json::from_str(&body).unwrap();
85 let _ = state.event_sender.send_async(event).await;
86 StatusCode::NO_CONTENT
87}
88
89#[async_trait]
90impl<T: ToSocketAddrs + Clone + Send + Sync> CommunicationService for HttpPostService<T> {
91 fn install(&mut self, _api_receiver: InternalAPIReceiver, event_sender: InternalEventSender) {
92 self.event_sender = Some(event_sender);
93 }
94
95 fn uninstall(&mut self) {
96 self.stop();
97 self.event_sender = None;
98 }
99
100 fn stop(&self) {
101 let _ = self.close_signal_sender.send(());
102 self.is_running.store(false, Ordering::Relaxed);
103 }
104
105 async fn start(&self) -> ServiceStartResult<()> {
106 if self.is_running.load(Ordering::Relaxed) {
107 return Err(ServiceStartError::TaskIsRunning);
108 }
109
110 if self.event_sender.is_none() {
111 return Err(ServiceStartError::NotInjectedEventSender);
112 }
113
114 let event_sender = self.event_sender.clone().unwrap();
115
116 let state = Arc::new(AppState {
117 event_sender,
118 hmac: self.hmac.clone(),
119 });
120
121 let listener = TcpListener::bind(self.addr.clone()).await?;
122 let router = Router::new()
123 .route(&self.prefix, any(processor))
124 .with_state(state);
125 let mut close_signal = self.close_signal_sender.subscribe();
126
127 self.is_running.store(true, Ordering::Relaxed);
128 tokio::spawn(
129 axum::serve(listener, router)
130 .with_graceful_shutdown(async move {
131 let _ = close_signal.recv().await;
132 })
133 .into_future(),
134 );
135
136 Ok(())
137 }
138}