async_sse_loader/
lib.rs

1pub mod sse_loader {
2    use serde::Deserialize;
3    use serde_json::from_str;
4
5    use futures::stream::StreamExt;
6    use futures_core::Stream;
7
8    pub async fn load_stream<T>(
9        url: &str,
10    ) -> Result<impl Stream<Item = T>, reqwest::Error>
11    where
12        T: for<'a> Deserialize<'a>,
13    {
14        let res = reqwest::get(url)
15            .await?
16            .bytes_stream()
17            .filter_map(|it| async {
18                it.ok()
19                    .map(|bytes| {
20                        std::str::from_utf8(&bytes)
21                            .ok()
22                            .map(|result_str| {
23                                let result_string = result_str.to_string();
24                                if result_string.starts_with("data") {
25                                    let sub_str = &result_string[5..];
26                                    Some(sub_str.to_string())
27                                } else if result_string.starts_with("{") {
28                                    Some(result_string)
29                                } else {
30                                    None
31                                }
32                            })
33                            .flatten()
34                    })
35                    .flatten()
36            })
37            .filter_map(|it: String| async move { from_str::<T>(&it).ok() });
38        Ok(Box::pin(res))
39    }
40}
41
42#[cfg(test)]
43mod tests {
44    use super::*;
45    use futures::stream::StreamExt;
46    use serde::Deserialize;
47    use tokio;
48    #[derive(Deserialize)]
49    struct StockPrice {}
50
51    #[tokio::test]
52    async fn test_stream() {
53        let stream = sse_loader::load_stream::<StockPrice>(
54            "https://api.boerse-frankfurt.de/data/price_information?isin=IE00B0M62Q58&mic=XETR",
55        )
56        .await;
57        assert!(stream.is_ok());
58        let val = stream.unwrap().next().await;
59        assert!(val.is_some());
60    }
61}