jmap_client/event_source/
stream.rs

1/*
2 * Copyright Stalwart Labs LLC See the COPYING
3 * file at the top-level directory of this distribution.
4 *
5 * Licensed under the Apache License, Version 2.0 <LICENSE-APACHE or
6 * https://www.apache.org/licenses/LICENSE-2.0> or the MIT license
7 * <LICENSE-MIT or https://opensource.org/licenses/MIT>, at your
8 * option. This file may not be copied, modified, or distributed
9 * except according to those terms.
10 */
11
12use crate::{
13    client::Client,
14    core::session::URLPart,
15    event_source::{parser::EventParser, PushNotification},
16    DataType,
17};
18use futures_util::{Stream, StreamExt};
19use reqwest::header::{HeaderValue, ACCEPT, CONTENT_TYPE};
20
21impl Client {
22    pub async fn event_source(
23        &self,
24        mut types: Option<impl IntoIterator<Item = DataType>>,
25        close_after_state: bool,
26        ping: Option<u32>,
27        last_event_id: Option<&str>,
28    ) -> crate::Result<impl Stream<Item = crate::Result<PushNotification>> + Unpin> {
29        let mut event_source_url = String::with_capacity(self.session().event_source_url().len());
30
31        for part in self.event_source_url() {
32            match part {
33                URLPart::Value(value) => {
34                    event_source_url.push_str(value);
35                }
36                URLPart::Parameter(param) => match param {
37                    super::URLParameter::Types => {
38                        if let Some(types) = Option::take(&mut types) {
39                            event_source_url.push_str(
40                                &types
41                                    .into_iter()
42                                    .map(|state| state.to_string())
43                                    .collect::<Vec<_>>()
44                                    .join(","),
45                            );
46                        } else {
47                            event_source_url.push('*');
48                        }
49                    }
50                    super::URLParameter::CloseAfter => {
51                        event_source_url.push_str(if close_after_state { "state" } else { "no" });
52                    }
53                    super::URLParameter::Ping => {
54                        if let Some(ping) = ping {
55                            event_source_url.push_str(&ping.to_string());
56                        } else {
57                            event_source_url.push('0');
58                        }
59                    }
60                },
61            }
62        }
63
64        // Add headers
65        let mut headers = self.headers().clone();
66        headers.remove(CONTENT_TYPE);
67        headers.insert(ACCEPT, HeaderValue::from_static("text/event-stream"));
68        if let Some(last_event_id) = last_event_id {
69            headers.insert(
70                "Last-Event-ID",
71                HeaderValue::from_str(last_event_id).unwrap(),
72            );
73        }
74
75        let mut stream = Client::handle_error(
76            reqwest::Client::builder()
77                .connect_timeout(self.timeout())
78                .danger_accept_invalid_certs(self.accept_invalid_certs)
79                .redirect(self.redirect_policy())
80                .default_headers(headers)
81                .build()?
82                .get(event_source_url)
83                .send()
84                .await?,
85        )
86        .await?
87        .bytes_stream();
88        let mut parser = EventParser::default();
89
90        Ok(Box::pin(async_stream::stream! {
91            loop {
92                if let Some(notification) = parser.filter_notification() {
93                    yield notification;
94                    continue;
95                }
96                if let Some(result) = stream.next().await {
97                    match result {
98                        Ok(bytes) => {
99                            parser.push_bytes(bytes.to_vec());
100                            continue;
101                        }
102                        Err(err) => {
103                            yield Err(err.into());
104                            break;
105                        }
106                    }
107                } else {
108                    break;
109                }
110            }
111        }))
112    }
113}