1extern crate url;
40
41#[cfg(feature = "native-tls")]
42extern crate native_tls_crate as native_tls;
43
44#[cfg(test)]
45extern crate http_test_server;
46
47#[cfg(feature = "native-tls")]
48mod tls;
49mod network;
50mod pub_sub;
51mod data;
52
53use std::sync::{Arc, Mutex, mpsc };
54use url::{Url, ParseError};
55use network::EventStream;
56use pub_sub::Bus;
57use data::{EventBuilder, EventBuilderState};
58
59pub use data::Event;
60pub use network::State;
61
62
63pub struct EventSource {
65 bus: Arc<Mutex<Bus<Event>>>,
66 stream: Arc<Mutex<EventStream>>
67}
68
69impl EventSource {
70 pub fn new(url: &str) -> Result<EventSource, ParseError> {
80 let event_stream = Arc::new(Mutex::new(EventStream::new(Url::parse(url)?).unwrap()));
81 let stream_for_update = Arc::clone(&event_stream);
82 let stream = Arc::clone(&event_stream);
83 let mut event_stream = event_stream.lock().unwrap();
84
85 let bus = Arc::new(Mutex::new(Bus::new()));
86
87 let event_bus = Arc::clone(&bus);
88 event_stream.on_open(move || {
89 publish_initial_stream_event(&event_bus);
90 });
91
92 let event_bus = Arc::clone(&bus);
93 event_stream.on_error(move |message| {
94 let event_bus = event_bus.lock().unwrap();
95 let event = Event::new("error", &message);
96 event_bus.publish(event.type_.clone(), event);
97 });
98
99 let event_builder = Arc::new(Mutex::new(EventBuilder::new()));
100 let event_bus = Arc::clone(&bus);
101
102 event_stream.on_message(move |message| {
103 handle_message(&message, &event_builder, &event_bus, &stream_for_update);
104 });
105
106 Ok(EventSource{ stream, bus })
107 }
108
109 pub fn close(&self) {
120 self.stream.lock().unwrap().close();
121 }
122
123 pub fn on_open<F>(&self, listener: F) where F: Fn() + Send + 'static {
137 self.add_event_listener("stream_opened", move |_| { listener(); });
138 }
139
140 pub fn on_message<F>(&self, listener: F) where F: Fn(Event) + Send + 'static {
155 self.add_event_listener("message", listener);
156 }
157
158 pub fn add_event_listener<F>(&self, event_type: &str, listener: F) where F: Fn(Event) + Send + 'static {
186 let mut bus = self.bus.lock().unwrap();
187 bus.subscribe(event_type.to_string(), listener);
188 }
189
190 pub fn state(&self) -> State {
204 self.stream.lock().unwrap().state()
205 }
206
207 pub fn receiver(&self) -> mpsc::Receiver<Event> {
232 let (tx, rx) = mpsc::channel();
233 let error_tx = tx.clone();
234
235 self.on_message(move |event| {
236 tx.send(event).unwrap();
237 });
238
239 self.add_event_listener("error", move |error| {
240 error_tx.send(error).unwrap();
241 });
242
243 rx
244 }
245}
246
247fn publish_initial_stream_event(event_bus: &Arc<Mutex<Bus<Event>>>) {
248 let event_bus = event_bus.lock().unwrap();
249 let event = Event::new("stream_opened", "");
250 event_bus.publish(event.type_.clone(), event);
251}
252
253fn handle_message(
254 message: &str,
255 event_builder: &Arc<Mutex<EventBuilder>>,
256 event_bus: &Arc<Mutex<Bus<Event>>>,
257 event_stream: &Arc<Mutex<EventStream>>) {
258
259 let mut event_builder = event_builder.lock().unwrap();
260
261 if let EventBuilderState::Complete(event) = event_builder.update(&message) {
262 let event_bus = event_bus.lock().unwrap();
263 event_stream.lock().unwrap().set_last_id(event.id.clone());
264 event_bus.publish(event.type_.clone(), event);
265 event_builder.clear();
266 }
267}
268
269#[cfg(test)]
270mod tests {
271 use super::*;
272 use std::thread;
273 use std::time::Duration;
274 use std::sync::mpsc;
275 use http_test_server::{ TestServer, Resource };
276 use http_test_server::http::Status;
277
278 fn setup() -> (TestServer, Resource, String) {
279 let server = TestServer::new().unwrap();
280 let resource = server.create_resource("/sub");
281 resource.header("Content-Type", "text/event-stream").stream();
282 let address = format!("http://localhost:{}/sub", server.port());
283 thread::sleep(Duration::from_millis(100));
284 (server, resource, address)
285 }
286
287
288 #[test]
289 fn should_create_client() {
290 let (_server, _stream_endpoint, address) = setup();
291 let event_source = EventSource::new(&address).unwrap();
292
293 event_source.close();
294 }
295
296 #[test]
297 fn should_thrown_an_error_when_malformed_url_provided() {
298 match EventSource::new("127.0.0.1:1236/sub") {
299 Ok(_) => assert!(false, "should had thrown an error"),
300 Err(_) => assert!(true)
301 }
302 }
303
304 #[test]
305 fn accept_closure_as_listeners() {
306 let (tx, rx) = mpsc::channel();
307 let (_server, stream_endpoint, address) = setup();
308 let event_source = EventSource::new(&address).unwrap();
309
310 event_source.on_message(move |message| {
311 tx.send(message.data).unwrap();
312 });
313
314 while event_source.state() == State::Connecting {
315 thread::sleep(Duration::from_millis(100));
316 }
317
318 stream_endpoint
319 .send_line("data: some message").send_line("");
320
321 let message = rx.recv().unwrap();
322 assert_eq!(message, "some message");
323
324 event_source.close();
325 }
326
327 #[test]
328 fn should_trigger_listeners_when_message_received() {
329 let (tx, rx) = mpsc::channel();
330 let tx2 = tx.clone();
331 let (_server, stream_endpoint, address) = setup();
332 let event_source = EventSource::new(&address).unwrap();
333
334 event_source.on_message(move |message| {
335 tx.send(message.data).unwrap();
336 });
337
338 event_source.on_message(move |message| {
339 tx2.send(message.data).unwrap();
340 });
341
342 while event_source.state() == State::Connecting {
343 thread::sleep(Duration::from_millis(100));
344 }
345
346 stream_endpoint.send_line("data: some message").send_line("");
347
348 let message = rx.recv().unwrap();
349 let message2 = rx.recv().unwrap();
350
351 assert_eq!(message, "some message");
352 assert_eq!(message2, "some message");
353
354 event_source.close();
355 }
356
357 #[test]
358 fn should_not_trigger_listeners_for_comments() {
359 let (tx, rx) = mpsc::channel();
360
361 let (_server, stream_endpoint, address) = setup();
362 let event_source = EventSource::new(&address).unwrap();
363
364 event_source.on_message(move |message| {
365 tx.send(message.data).unwrap();
366 });
367
368 while event_source.state() != State::Open {
369 thread::sleep(Duration::from_millis(100));
370 };
371
372 stream_endpoint
373 .send_line("data: message")
374 .send_line("")
375 .send_line(":this is a comment")
376 .send_line(":this is another comment")
377 .send_line("data: this is a message")
378 .send_line("");
379
380 let message = rx.recv().unwrap();
381 let message2 = rx.recv().unwrap();
382
383 assert_eq!(message, "message");
384 assert_eq!(message2, "this is a message");
385
386 event_source.close();
387 }
388
389 #[test]
390 fn ignore_empty_messages() {
391 let (tx, rx) = mpsc::channel();
392
393 let (_server, stream_endpoint, address) = setup();
394 let event_source = EventSource::new(&address).unwrap();
395
396 event_source.on_message(move |message| {
397 tx.send(message.data).unwrap();
398 });
399
400 while event_source.state() != State::Open {
401 thread::sleep(Duration::from_millis(100));
402 };
403
404 stream_endpoint
405 .send_line("data: message")
406 .send_line("")
407 .send_line("")
408 .send_line("data: this is a message")
409 .send_line("");
410
411 let message = rx.recv().unwrap();
412 let message2 = rx.recv().unwrap();
413
414 assert_eq!(message, "message");
415 assert_eq!(message2, "this is a message");
416
417 event_source.close();
418 }
419
420 #[test]
421 fn event_trigger_its_defined_listener() {
422 let (tx, rx) = mpsc::channel();
423 let (_server, stream_endpoint, address) = setup();
424 let event_source = EventSource::new(&address).unwrap();
425
426 event_source.add_event_listener("myEvent", move |event| {
427 tx.send(event).unwrap();
428 });
429
430 while event_source.state() == State::Connecting {
431 thread::sleep(Duration::from_millis(100));
432 }
433
434 stream_endpoint
435 .send_line("event: myEvent")
436 .send_line("data: my message\n");
437
438 let message = rx.recv().unwrap();
439
440 assert_eq!(message.type_, String::from("myEvent"));
441 assert_eq!(message.data, String::from("my message"));
442
443 event_source.close();
444 }
445
446 #[test]
447 fn dont_trigger_on_message_for_event() {
448 let (tx, rx) = mpsc::channel();
449 let (_server, stream_endpoint, address) = setup();
450 let event_source = EventSource::new(&address).unwrap();
451
452 event_source.on_message(move |_| {
453 tx.send("NOOOOOOOOOOOOOOOOOOO!").unwrap();
454 });
455
456 stream_endpoint
457 .send("event: myEvent\n")
458 .send("data: my message\n\n");
459
460 thread::sleep(Duration::from_millis(500));
461 assert!(rx.try_recv().is_err());
462
463 event_source.close();
464 }
465
466 #[test]
467 fn should_close_connection() {
468 let (tx, rx) = mpsc::channel();
469
470 let (_server, stream_endpoint, address) = setup();
471 let event_source = EventSource::new(&address).unwrap();
472
473 event_source.on_message(move |message| {
474 tx.send(message.data).unwrap();
475 });
476
477 while event_source.state() != State::Open {
478 thread::sleep(Duration::from_millis(100));
479 };
480
481 stream_endpoint.send("\ndata: some message\n\n");
482 rx.recv().unwrap();
483 event_source.close();
484 stream_endpoint.send("\ndata: some message\n\n");
485
486 thread::sleep(Duration::from_millis(400));
487 assert!(rx.try_recv().is_err());
488 }
489
490 #[test]
491 fn should_trigger_on_open_callback_when_connected() {
492 let (tx, rx) = mpsc::channel();
493 let (_server, stream_endpoint, address) = setup();
494 stream_endpoint.delay(Duration::from_millis(200));
495
496 let event_source = EventSource::new(&address).unwrap();
497
498 event_source.on_open(move || {
499 tx.send("open").unwrap();
500 });
501
502 rx.recv().unwrap();
503
504 event_source.close();
505 }
506
507 #[test]
508 fn should_return_stream_connection_status() {
509 let (_server, stream_endpoint, address) = setup();
510 stream_endpoint
511 .delay(Duration::from_millis(200))
512 .stream();
513
514 let event_source = EventSource::new(&address).unwrap();
515 thread::sleep(Duration::from_millis(100));
516
517 assert_eq!(event_source.state(), State::Connecting);
518
519 thread::sleep(Duration::from_millis(200));
520
521 assert_eq!(event_source.state(), State::Open);
522
523 event_source.close();
524 thread::sleep(Duration::from_millis(200));
525
526 assert_eq!(event_source.state(), State::Closed);
527 }
528
529
530 #[test]
531 fn should_send_last_event_id_on_reconnection() {
532 let (server, stream_endpoint, address) = setup();
533 let event_source = EventSource::new(&address).unwrap();
534 thread::sleep(Duration::from_millis(100));
535
536 stream_endpoint.send("id: helpMe\n");
537 stream_endpoint.send("data: my message\n\n");
538
539 thread::sleep(Duration::from_millis(500));
540
541 stream_endpoint.close_open_connections();
542
543 let request = server.requests().recv().unwrap();
544
545 assert_eq!(request.headers.get("Last-Event-ID").unwrap(), "helpMe");
546
547 event_source.close();
548 }
549
550 #[test]
551 fn should_expose_blocking_api() {
552 let (_server, stream_endpoint, address) = setup();
553 let event_source = EventSource::new(&address).unwrap();
554 thread::sleep(Duration::from_millis(100));
555 let rx = event_source.receiver();
556
557 stream_endpoint.send("data: some message\n\n");
558 stream_endpoint.send("data: some message 2\n\n");
559
560 assert_eq!(rx.recv().unwrap().data, "some message");
561 assert_eq!(rx.recv().unwrap().data, "some message 2");
562
563 event_source.close();
564 }
565
566 #[test]
567 fn receiver_should_get_error_events() {
568 let (_server, stream_endpoint, address) = setup();
569 stream_endpoint
570 .delay(Duration::from_millis(100))
571 .status(Status::InternalServerError);
572 let event_source = EventSource::new(&address).unwrap();
573 let rx = event_source.receiver();
574
575 assert_eq!(rx.recv().unwrap().type_, "error");
576
577 event_source.close();
578 }
579}