rs_wait_valid_req/
lib.rs

1use std::io;
2use std::sync::Arc;
3
4use std::collections::BTreeMap;
5
6pub use form_urlencoded;
7
8use axum::http;
9
10pub use http::Uri;
11
12use http::StatusCode;
13
14pub fn create_uri_parser(keys: Vec<String>) -> impl Fn(&Uri) -> BTreeMap<String, String> {
15    move |uri: &Uri| {
16        form_urlencoded::parse(uri.query().unwrap_or("").as_bytes())
17            .into_iter()
18            .filter(|(k, _)| keys.contains(&k.to_string()))
19            .map(|(k, v)| (k.into_owned(), v.into_owned()))
20            .collect()
21    }
22}
23
24pub async fn send_converted<F, T>(
25    uri: &Uri,
26    f: &F,
27    sender: &tokio::sync::mpsc::Sender<T>,
28) -> Result<(), io::Error>
29where
30    T: Send + Sync + 'static,
31    F: Fn(&Uri) -> Result<T, io::Error>,
32{
33    let t: T = f(uri)?;
34    sender.try_send(t).map_err(io::Error::other)?;
35    Ok(())
36}
37
38pub trait UriToParsed: Sync + Send + 'static {
39    type Item: Sync + Send + 'static;
40    fn convert(&self, u: &Uri) -> Result<Self::Item, io::Error>;
41}
42
43impl<U> UriToParsed for Arc<U>
44where
45    U: UriToParsed,
46{
47    type Item = U::Item;
48    fn convert(&self, u: &Uri) -> Result<Self::Item, io::Error> {
49        let original: &U = self;
50        original.convert(u)
51    }
52}
53
54pub async fn wait_valid_req_forever<T, U>(addr: String, u2p: U) -> Result<T, io::Error>
55where
56    T: Sync + Send + 'static,
57    U: UriToParsed<Item = T> + Clone,
58{
59    let (finish_tx, mut finish_rx) = tokio::sync::mpsc::channel::<()>(1);
60    let (tx, mut rx) = tokio::sync::mpsc::channel::<T>(1);
61
62    let server_handle = tokio::spawn(async move {
63        let router = axum::Router::new().route(
64            "/",
65            axum::routing::get(move |uri: Uri| {
66                let tx_clone = tx.clone();
67                let finish_tx_clone = finish_tx.clone();
68                async move {
69                    let f = |u: &Uri| u2p.convert(u);
70                    match send_converted(&uri, &f, &tx_clone).await {
71                        Ok(_) => {}
72                        Err(_) => {
73                            return (
74                                StatusCode::INTERNAL_SERVER_ERROR,
75                                "unable to handle the request",
76                            );
77                        }
78                    };
79
80                    match finish_tx_clone.try_send(()) {
81                        Ok(_) => (StatusCode::OK, "ok"),
82                        Err(_) => (
83                            StatusCode::INTERNAL_SERVER_ERROR,
84                            "unable to send the finish notification",
85                        ),
86                    }
87                }
88            }),
89        );
90
91        let listener = tokio::net::TcpListener::bind(&addr).await;
92        match listener {
93            Ok(listener) => {
94                axum::serve(listener, router)
95                    .with_graceful_shutdown(async move {
96                        finish_rx.recv().await;
97                    })
98                    .await
99            }
100            Err(e) => Err(e),
101        }
102    });
103
104    // Wait for the server to start or fail
105    server_handle
106        .await
107        .map_err(|e| io::Error::other(format!("server task panicked: {}", e)))?
108        .map_err(|e| io::Error::other(format!("server failed to start: {}", e)))?;
109
110    rx.recv().await.ok_or(io::Error::other("no response got"))
111}