sse_client/
lib.rs

1//! # SSE Client
2//! EventSource implementation to handle streams of Server-Sent Events.
3//! It handles connections, redirections, retries and message parsing.
4//!
5//! To know more about SSE: <a href="https://html.spec.whatwg.org/multipage/server-sent-events.html" target="_new">Standard</a> | <a href="https://developer.mozilla.org/en-US/docs/Web/API/EventSource" target="_new">EventSource interface</a>
6//!
7//! # Example:
8//!
9//! ```no_run
10//! extern crate sse_client;
11//! use sse_client::EventSource;
12//!
13//!
14//! let event_source = EventSource::new("http://event-stream-address/sub").unwrap();
15//!
16//! event_source.on_message(|message| {
17//!     println!("New message event {:?}", message);
18//! });
19//!
20//! event_source.add_event_listener("error", |error| {
21//!     println!("Error {:?}", error);
22//! });
23//!
24//! ```
25//!
26//! Or:
27//! ```no_run
28//! extern crate sse_client;
29//! use sse_client::EventSource;
30//!
31//!
32//! let event_source = EventSource::new("http://event-stream-address/sub").unwrap();
33//!
34//! for event in event_source.receiver().iter() {
35//!     println!("New Message: {}", event.data);
36//! }
37//!
38//! ```
39extern crate url;
40
41#[cfg(feature = "native-tls")]
42extern crate native_tls_crate as native_tls;
43
44#[cfg(test)]
45extern crate http_test_server;
46
47#[cfg(feature = "native-tls")]
48mod tls;
49mod network;
50mod pub_sub;
51mod data;
52
53use std::sync::{Arc, Mutex, mpsc };
54use url::{Url, ParseError};
55use network::EventStream;
56use pub_sub::Bus;
57use data::{EventBuilder, EventBuilderState};
58
59pub use data::Event;
60pub use network::State;
61
62
63/// Interface to interact with `event-streams`
64pub struct EventSource {
65    bus: Arc<Mutex<Bus<Event>>>,
66    stream: Arc<Mutex<EventStream>>
67}
68
69impl EventSource {
70    /// Create object and starts connection with event-stream
71    ///
72    /// # Example
73    ///
74    /// ```no_run
75    /// # extern crate sse_client;
76    /// # use sse_client::EventSource;
77    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
78    /// ```
79    pub fn new(url: &str) -> Result<EventSource, ParseError> {
80        let event_stream = Arc::new(Mutex::new(EventStream::new(Url::parse(url)?).unwrap()));
81        let stream_for_update = Arc::clone(&event_stream);
82        let stream = Arc::clone(&event_stream);
83        let mut event_stream = event_stream.lock().unwrap();
84
85        let bus = Arc::new(Mutex::new(Bus::new()));
86
87        let event_bus = Arc::clone(&bus);
88        event_stream.on_open(move || {
89            publish_initial_stream_event(&event_bus);
90        });
91
92        let event_bus = Arc::clone(&bus);
93        event_stream.on_error(move |message| {
94            let event_bus = event_bus.lock().unwrap();
95            let event = Event::new("error", &message);
96            event_bus.publish(event.type_.clone(), event);
97        });
98
99        let event_builder = Arc::new(Mutex::new(EventBuilder::new()));
100        let event_bus = Arc::clone(&bus);
101
102        event_stream.on_message(move |message| {
103            handle_message(&message, &event_builder, &event_bus, &stream_for_update);
104        });
105
106        Ok(EventSource{ stream, bus })
107    }
108
109    /// Close connection.
110    ///
111    /// # Example
112    ///
113    /// ```no_run
114    /// # extern crate sse_client;
115    /// # use sse_client::EventSource;
116    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
117    /// event_source.close();
118    /// ```
119    pub fn close(&self) {
120        self.stream.lock().unwrap().close();
121    }
122
123    /// Triggered when connection with stream is stabilished.
124    ///
125    /// # Example
126    ///
127    /// ```no_run
128    /// # extern crate sse_client;
129    /// # use sse_client::EventSource;
130    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
131    ///
132    /// event_source.on_open(|| {
133    ///     println!("Connection stabilished!");
134    /// });
135    /// ```
136    pub fn on_open<F>(&self, listener: F) where F: Fn() + Send + 'static {
137        self.add_event_listener("stream_opened", move |_| { listener(); });
138    }
139
140    /// Triggered when `message` event is received.
141    /// Any event that doesn't contain an `event` field is considered a `message` event.
142    ///
143    /// # Example
144    ///
145    /// ```no_run
146    /// # extern crate sse_client;
147    /// # use sse_client::EventSource;
148    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
149    ///
150    /// event_source.on_message(|message| {
151    ///     println!("Message received: {}", message.data);
152    /// });
153    /// ```
154    pub fn on_message<F>(&self, listener: F) where F: Fn(Event) + Send + 'static {
155        self.add_event_listener("message", listener);
156    }
157
158    /// Triggered when event with specified type is received.
159    ///
160    /// Any connection error is notified as event with type `error`.
161    ///
162    /// Events with no type defined have `message` as default type.
163    ///
164    /// # Example
165    ///
166    /// ```no_run
167    /// # extern crate sse_client;
168    /// # use sse_client::EventSource;
169    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
170    ///
171    ///
172    /// event_source.add_event_listener("myEvent", |event| {
173    ///     println!("Event {} received: {}", event.type_, event.data);
174    /// });
175    ///
176    /// event_source.add_event_listener("error", |error| {
177    ///     println!("Error: {}", error.data);
178    /// });
179    ///
180    /// // equivalent to `on_message`
181    /// event_source.add_event_listener("message", |message| {
182    ///     println!("Message received: {}", message.data);
183    /// });
184    /// ```
185    pub fn add_event_listener<F>(&self, event_type: &str, listener: F) where F: Fn(Event) + Send + 'static {
186        let mut bus = self.bus.lock().unwrap();
187        bus.subscribe(event_type.to_string(), listener);
188    }
189
190    /// Returns client [`State`].
191    ///
192    /// # Example
193    ///
194    /// ```no_run
195    /// # extern crate sse_client;
196    /// # use sse_client::EventSource;
197    /// use sse_client::State;
198    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
199    ///
200    /// assert_eq!(event_source.state(), State::Connecting);
201    /// ```
202    /// [`State`]: enum.State.html
203    pub fn state(&self) -> State {
204        self.stream.lock().unwrap().state()
205    }
206
207    /// Returns a receiver that is triggered on any new message or error.
208    ///
209    /// # Example
210    ///
211    /// ```no_run
212    /// # extern crate sse_client;
213    /// # use sse_client::EventSource;
214    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
215    ///
216    /// for event in event_source.receiver().iter() {
217    ///     println!("New Message: {}", event.data);
218    /// }
219    ///
220    /// ```
221    /// ```no_run
222    /// # extern crate sse_client;
223    /// # use sse_client::EventSource;
224    /// let event_source = EventSource::new("http://example.com/sub").unwrap();
225    /// let rx = event_source.receiver();
226    ///
227    /// let event = rx.recv().unwrap();
228    /// //...
229    ///
230    /// ```
231    pub fn receiver(&self) -> mpsc::Receiver<Event> {
232        let (tx, rx) = mpsc::channel();
233        let error_tx = tx.clone();
234
235        self.on_message(move |event| {
236            tx.send(event).unwrap();
237        });
238
239        self.add_event_listener("error", move |error| {
240            error_tx.send(error).unwrap();
241        });
242
243        rx
244    }
245}
246
247fn publish_initial_stream_event(event_bus: &Arc<Mutex<Bus<Event>>>) {
248    let event_bus = event_bus.lock().unwrap();
249    let event = Event::new("stream_opened", "");
250    event_bus.publish(event.type_.clone(), event);
251}
252
253fn handle_message(
254    message: &str,
255    event_builder: &Arc<Mutex<EventBuilder>>,
256    event_bus: &Arc<Mutex<Bus<Event>>>,
257    event_stream: &Arc<Mutex<EventStream>>) {
258
259    let mut event_builder = event_builder.lock().unwrap();
260
261    if let EventBuilderState::Complete(event) = event_builder.update(&message) {
262        let event_bus = event_bus.lock().unwrap();
263        event_stream.lock().unwrap().set_last_id(event.id.clone());
264        event_bus.publish(event.type_.clone(), event);
265        event_builder.clear();
266    }
267}
268
269#[cfg(test)]
270mod tests {
271    use super::*;
272    use std::thread;
273    use std::time::Duration;
274    use std::sync::mpsc;
275    use http_test_server::{ TestServer, Resource };
276    use http_test_server::http::Status;
277
278    fn setup() -> (TestServer, Resource, String) {
279        let server = TestServer::new().unwrap();
280        let resource = server.create_resource("/sub");
281        resource.header("Content-Type", "text/event-stream").stream();
282        let address = format!("http://localhost:{}/sub", server.port());
283        thread::sleep(Duration::from_millis(100));
284        (server, resource, address)
285    }
286
287
288    #[test]
289    fn should_create_client() {
290        let (_server, _stream_endpoint, address) = setup();
291        let event_source = EventSource::new(&address).unwrap();
292
293        event_source.close();
294    }
295
296    #[test]
297    fn should_thrown_an_error_when_malformed_url_provided() {
298        match EventSource::new("127.0.0.1:1236/sub") {
299            Ok(_) => assert!(false, "should had thrown an error"),
300            Err(_) => assert!(true)
301        }
302    }
303
304    #[test]
305    fn accept_closure_as_listeners() {
306        let (tx, rx) = mpsc::channel();
307        let (_server, stream_endpoint, address) = setup();
308        let event_source = EventSource::new(&address).unwrap();
309
310        event_source.on_message(move |message| {
311            tx.send(message.data).unwrap();
312        });
313
314        while event_source.state() == State::Connecting {
315            thread::sleep(Duration::from_millis(100));
316        }
317
318        stream_endpoint
319            .send_line("data: some message").send_line("");
320
321        let message = rx.recv().unwrap();
322        assert_eq!(message, "some message");
323
324        event_source.close();
325    }
326
327    #[test]
328    fn should_trigger_listeners_when_message_received() {
329        let (tx, rx) = mpsc::channel();
330        let tx2 = tx.clone();
331        let (_server, stream_endpoint, address) = setup();
332        let event_source = EventSource::new(&address).unwrap();
333
334        event_source.on_message(move |message| {
335            tx.send(message.data).unwrap();
336        });
337
338        event_source.on_message(move |message| {
339            tx2.send(message.data).unwrap();
340        });
341
342        while event_source.state() == State::Connecting {
343            thread::sleep(Duration::from_millis(100));
344        }
345
346        stream_endpoint.send_line("data: some message").send_line("");
347
348        let message = rx.recv().unwrap();
349        let message2 = rx.recv().unwrap();
350
351        assert_eq!(message, "some message");
352        assert_eq!(message2, "some message");
353
354        event_source.close();
355    }
356
357    #[test]
358    fn should_not_trigger_listeners_for_comments() {
359        let (tx, rx) = mpsc::channel();
360
361        let (_server, stream_endpoint, address) = setup();
362        let event_source = EventSource::new(&address).unwrap();
363
364        event_source.on_message(move |message| {
365            tx.send(message.data).unwrap();
366        });
367
368        while event_source.state() != State::Open {
369            thread::sleep(Duration::from_millis(100));
370        };
371
372        stream_endpoint
373            .send_line("data: message")
374            .send_line("")
375            .send_line(":this is a comment")
376            .send_line(":this is another comment")
377            .send_line("data: this is a message")
378            .send_line("");
379
380        let message = rx.recv().unwrap();
381        let message2 = rx.recv().unwrap();
382
383        assert_eq!(message, "message");
384        assert_eq!(message2, "this is a message");
385
386        event_source.close();
387    }
388
389    #[test]
390    fn ignore_empty_messages() {
391        let (tx, rx) = mpsc::channel();
392
393        let (_server, stream_endpoint, address) = setup();
394        let event_source = EventSource::new(&address).unwrap();
395
396        event_source.on_message(move |message| {
397            tx.send(message.data).unwrap();
398        });
399
400        while event_source.state() != State::Open {
401            thread::sleep(Duration::from_millis(100));
402        };
403
404        stream_endpoint
405            .send_line("data: message")
406            .send_line("")
407            .send_line("")
408            .send_line("data: this is a message")
409            .send_line("");
410
411        let message = rx.recv().unwrap();
412        let message2 = rx.recv().unwrap();
413
414        assert_eq!(message, "message");
415        assert_eq!(message2, "this is a message");
416
417        event_source.close();
418    }
419
420    #[test]
421    fn event_trigger_its_defined_listener() {
422        let (tx, rx) = mpsc::channel();
423        let (_server, stream_endpoint, address) = setup();
424        let event_source = EventSource::new(&address).unwrap();
425
426        event_source.add_event_listener("myEvent", move |event| {
427            tx.send(event).unwrap();
428        });
429
430        while event_source.state() == State::Connecting {
431            thread::sleep(Duration::from_millis(100));
432        }
433
434        stream_endpoint
435            .send_line("event: myEvent")
436            .send_line("data: my message\n");
437
438        let message = rx.recv().unwrap();
439
440        assert_eq!(message.type_, String::from("myEvent"));
441        assert_eq!(message.data, String::from("my message"));
442
443        event_source.close();
444    }
445
446    #[test]
447    fn dont_trigger_on_message_for_event() {
448        let (tx, rx) = mpsc::channel();
449        let (_server, stream_endpoint, address) = setup();
450        let event_source = EventSource::new(&address).unwrap();
451
452        event_source.on_message(move |_| {
453            tx.send("NOOOOOOOOOOOOOOOOOOO!").unwrap();
454        });
455
456        stream_endpoint
457            .send("event: myEvent\n")
458            .send("data: my message\n\n");
459
460        thread::sleep(Duration::from_millis(500));
461        assert!(rx.try_recv().is_err());
462
463        event_source.close();
464    }
465
466    #[test]
467    fn should_close_connection() {
468        let (tx, rx) = mpsc::channel();
469
470        let (_server, stream_endpoint, address) = setup();
471        let event_source = EventSource::new(&address).unwrap();
472
473         event_source.on_message(move |message| {
474            tx.send(message.data).unwrap();
475        });
476
477        while event_source.state() != State::Open {
478            thread::sleep(Duration::from_millis(100));
479        };
480
481        stream_endpoint.send("\ndata: some message\n\n");
482        rx.recv().unwrap();
483        event_source.close();
484        stream_endpoint.send("\ndata: some message\n\n");
485
486        thread::sleep(Duration::from_millis(400));
487        assert!(rx.try_recv().is_err());
488    }
489
490    #[test]
491    fn should_trigger_on_open_callback_when_connected() {
492        let (tx, rx) = mpsc::channel();
493        let (_server, stream_endpoint, address) = setup();
494        stream_endpoint.delay(Duration::from_millis(200));
495
496        let event_source = EventSource::new(&address).unwrap();
497
498        event_source.on_open(move || {
499            tx.send("open").unwrap();
500        });
501
502        rx.recv().unwrap();
503
504        event_source.close();
505    }
506
507    #[test]
508    fn should_return_stream_connection_status() {
509        let (_server, stream_endpoint, address) = setup();
510        stream_endpoint
511            .delay(Duration::from_millis(200))
512            .stream();
513
514        let event_source = EventSource::new(&address).unwrap();
515        thread::sleep(Duration::from_millis(100));
516
517        assert_eq!(event_source.state(), State::Connecting);
518
519        thread::sleep(Duration::from_millis(200));
520
521        assert_eq!(event_source.state(), State::Open);
522
523        event_source.close();
524        thread::sleep(Duration::from_millis(200));
525
526        assert_eq!(event_source.state(), State::Closed);
527    }
528
529
530    #[test]
531    fn should_send_last_event_id_on_reconnection() {
532        let (server, stream_endpoint, address) = setup();
533        let event_source = EventSource::new(&address).unwrap();
534        thread::sleep(Duration::from_millis(100));
535
536        stream_endpoint.send("id: helpMe\n");
537        stream_endpoint.send("data: my message\n\n");
538
539        thread::sleep(Duration::from_millis(500));
540
541        stream_endpoint.close_open_connections();
542
543        let request = server.requests().recv().unwrap();
544
545        assert_eq!(request.headers.get("Last-Event-ID").unwrap(), "helpMe");
546
547        event_source.close();
548    }
549
550    #[test]
551    fn should_expose_blocking_api() {
552        let (_server, stream_endpoint, address) = setup();
553        let event_source = EventSource::new(&address).unwrap();
554        thread::sleep(Duration::from_millis(100));
555        let rx = event_source.receiver();
556
557        stream_endpoint.send("data: some message\n\n");
558        stream_endpoint.send("data: some message 2\n\n");
559
560        assert_eq!(rx.recv().unwrap().data, "some message");
561        assert_eq!(rx.recv().unwrap().data, "some message 2");
562
563        event_source.close();
564    }
565
566    #[test]
567    fn receiver_should_get_error_events() {
568        let (_server, stream_endpoint, address) = setup();
569        stream_endpoint
570            .delay(Duration::from_millis(100))
571            .status(Status::InternalServerError);
572        let event_source = EventSource::new(&address).unwrap();
573        let rx = event_source.receiver();
574
575        assert_eq!(rx.recv().unwrap().type_, "error");
576
577        event_source.close();
578    }
579}