jmap_client/event_source/
stream.rs1use crate::{
13 client::Client,
14 core::session::URLPart,
15 event_source::{parser::EventParser, PushNotification},
16 DataType,
17};
18use futures_util::{Stream, StreamExt};
19use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE};
20
21impl Client {
22 pub async fn event_source(
23 &self,
24 mut types: Option<impl IntoIterator<Item = DataType>>,
25 close_after_state: bool,
26 ping: Option<u32>,
27 last_event_id: Option<&str>,
28 ) -> crate::Result<impl Stream<Item = crate::Result<PushNotification>> + Unpin> {
29 let mut event_source_url = String::with_capacity(self.session().event_source_url().len());
30
31 for part in self.event_source_url() {
32 match part {
33 URLPart::Value(value) => {
34 event_source_url.push_str(value);
35 }
36 URLPart::Parameter(param) => match param {
37 super::URLParameter::Types => {
38 if let Some(types) = Option::take(&mut types) {
39 event_source_url.push_str(
40 &types
41 .into_iter()
42 .map(|state| state.to_string())
43 .collect::<Vec<_>>()
44 .join(","),
45 );
46 } else {
47 event_source_url.push('*');
48 }
49 }
50 super::URLParameter::CloseAfter => {
51 event_source_url.push_str(if close_after_state { "state" } else { "no" });
52 }
53 super::URLParameter::Ping => {
54 if let Some(ping) = ping {
55 event_source_url.push_str(&ping.to_string());
56 } else {
57 event_source_url.push('0');
58 }
59 }
60 },
61 }
62 }
63
64 let mut headers = self.headers().clone();
66 headers.remove(CONTENT_TYPE);
67 headers.insert(ACCEPT, HeaderValue::from_static("text/event-stream"));
68 if let Some(last_event_id) = last_event_id {
69 headers.insert(
70 "Last-Event-ID",
71 HeaderValue::from_str(last_event_id).unwrap(),
72 );
73 }
74
75 let mut stream = Client::handle_error(
76 reqwest::Client::builder()
77 .connect_timeout(self.timeout())
78 .danger_accept_invalid_certs(self.accept_invalid_certs)
79 .redirect(self.redirect_policy())
80 .default_headers(headers)
81 .build()?
82 .get(event_source_url)
83 .send()
84 .await?,
85 )
86 .await?
87 .bytes_stream();
88 let mut parser = EventParser::default();
89
90 Ok(Box::pin(async_stream::stream! {
91 loop {
92 if let Some(notification) = parser.filter_notification() {
93 yield notification;
94 continue;
95 }
96 if let Some(result) = stream.next().await {
97 match result {
98 Ok(bytes) => {
99 parser.push_bytes(bytes.to_vec());
100 continue;
101 }
102 Err(err) => {
103 yield Err(err.into());
104 break;
105 }
106 }
107 } else {
108 break;
109 }
110 }
111 }))
112 }
113}