1pub mod sse_loader {
2 use serde::Deserialize;
3 use serde_json::from_str;
4
5 use futures::stream::StreamExt;
6 use futures_core::Stream;
7
8 pub async fn load_stream<T>(
9 url: &str,
10 ) -> Result<impl Stream<Item = T>, reqwest::Error>
11 where
12 T: for<'a> Deserialize<'a>,
13 {
14 let res = reqwest::get(url)
15 .await?
16 .bytes_stream()
17 .filter_map(|it| async {
18 it.ok()
19 .map(|bytes| {
20 std::str::from_utf8(&bytes)
21 .ok()
22 .map(|result_str| {
23 let result_string = result_str.to_string();
24 if result_string.starts_with("data") {
25 let sub_str = &result_string[5..];
26 Some(sub_str.to_string())
27 } else if result_string.starts_with("{") {
28 Some(result_string)
29 } else {
30 None
31 }
32 })
33 .flatten()
34 })
35 .flatten()
36 })
37 .filter_map(|it: String| async move { from_str::<T>(&it).ok() });
38 Ok(Box::pin(res))
39 }
40}
41
42#[cfg(test)]
43mod tests {
44 use super::*;
45 use futures::stream::StreamExt;
46 use serde::Deserialize;
47 use tokio;
48 #[derive(Deserialize)]
49 struct StockPrice {}
50
51 #[tokio::test]
52 async fn test_stream() {
53 let stream = sse_loader::load_stream::<StockPrice>(
54 "https://api.boerse-frankfurt.de/data/price_information?isin=IE00B0M62Q58&mic=XETR",
55 )
56 .await;
57 assert!(stream.is_ok());
58 let val = stream.unwrap().next().await;
59 assert!(val.is_some());
60 }
61}