1use std::io;
2use std::sync::Arc;
3
4use std::collections::BTreeMap;
5
6pub use form_urlencoded;
7
8use axum::http;
9
10pub use http::Uri;
11
12use http::StatusCode;
13
14pub fn create_uri_parser(keys: Vec<String>) -> impl Fn(&Uri) -> BTreeMap<String, String> {
15 move |uri: &Uri| {
16 form_urlencoded::parse(uri.query().unwrap_or("").as_bytes())
17 .into_iter()
18 .filter(|(k, _)| keys.contains(&k.to_string()))
19 .map(|(k, v)| (k.into_owned(), v.into_owned()))
20 .collect()
21 }
22}
23
24pub async fn send_converted<F, T>(
25 uri: &Uri,
26 f: &F,
27 sender: &tokio::sync::mpsc::Sender<T>,
28) -> Result<(), io::Error>
29where
30 T: Send + Sync + 'static,
31 F: Fn(&Uri) -> Result<T, io::Error>,
32{
33 let t: T = f(uri)?;
34 sender.try_send(t).map_err(io::Error::other)?;
35 Ok(())
36}
37
38pub trait UriToParsed: Sync + Send + 'static {
39 type Item: Sync + Send + 'static;
40 fn convert(&self, u: &Uri) -> Result<Self::Item, io::Error>;
41}
42
43impl<U> UriToParsed for Arc<U>
44where
45 U: UriToParsed,
46{
47 type Item = U::Item;
48 fn convert(&self, u: &Uri) -> Result<Self::Item, io::Error> {
49 let original: &U = self;
50 original.convert(u)
51 }
52}
53
54pub async fn wait_valid_req_forever<T, U>(addr: String, u2p: U) -> Result<T, io::Error>
55where
56 T: Sync + Send + 'static,
57 U: UriToParsed<Item = T> + Clone,
58{
59 let (finish_tx, mut finish_rx) = tokio::sync::mpsc::channel::<()>(1);
60 let (tx, mut rx) = tokio::sync::mpsc::channel::<T>(1);
61
62 let server_handle = tokio::spawn(async move {
63 let router = axum::Router::new().route(
64 "/",
65 axum::routing::get(move |uri: Uri| {
66 let tx_clone = tx.clone();
67 let finish_tx_clone = finish_tx.clone();
68 async move {
69 let f = |u: &Uri| u2p.convert(u);
70 match send_converted(&uri, &f, &tx_clone).await {
71 Ok(_) => {}
72 Err(_) => {
73 return (
74 StatusCode::INTERNAL_SERVER_ERROR,
75 "unable to handle the request",
76 );
77 }
78 };
79
80 match finish_tx_clone.try_send(()) {
81 Ok(_) => (StatusCode::OK, "ok"),
82 Err(_) => (
83 StatusCode::INTERNAL_SERVER_ERROR,
84 "unable to send the finish notification",
85 ),
86 }
87 }
88 }),
89 );
90
91 let listener = tokio::net::TcpListener::bind(&addr).await;
92 match listener {
93 Ok(listener) => {
94 axum::serve(listener, router)
95 .with_graceful_shutdown(async move {
96 finish_rx.recv().await;
97 })
98 .await
99 }
100 Err(e) => Err(e),
101 }
102 });
103
104 server_handle
106 .await
107 .map_err(|e| io::Error::other(format!("server task panicked: {}", e)))?
108 .map_err(|e| io::Error::other(format!("server failed to start: {}", e)))?;
109
110 rx.recv().await.ok_or(io::Error::other("no response got"))
111}