events_sse/
events-sse.rs

1//! Server-Sent Events Example
2//! ==========================
3//!
4//! Start the server:
5//! ```
6//! $ cargo run --package beatrice --example events-sse
7//!    Compiling beatrice v0.1.0 (/x/beatrice-rs)
8//!     Finished dev [unoptimized + debuginfo] target(s) in 1.09s
9//!      Running `target/debug/examples/events-sse`
10//! Access the server at http://127.0.0.1:8000/subscribe
11//! INFO GET /subscribe => 200 streamed
12//! ^C
13//! ```
14//!
15//! Make a request to it:
16//! ```
17//! $ curl http://127.0.0.1:8000/subscribe
18//! data: 2
19//! data: 3
20//! data: 4
21//! data: 5
22//! $
23//! ```
24#![forbid(unsafe_code)]
25use beatrice::reexport::{safina_executor, safina_timer};
26use beatrice::{
27    print_log_response, socket_addr_127_0_0_1, Event, EventSender, HttpServerBuilder, Request,
28    Response,
29};
30use permit::Permit;
31use std::sync::{Arc, Mutex};
32use std::time::Duration;
33
34struct State {
35    subscribers: Mutex<Vec<EventSender>>,
36}
37impl State {
38    pub fn new() -> Self {
39        Self {
40            subscribers: Mutex::new(Vec::new()),
41        }
42    }
43}
44
45#[allow(clippy::needless_pass_by_value)]
46fn event_sender_thread(state: Arc<State>, permit: Permit) {
47    loop {
48        for n in 0..6 {
49            std::thread::sleep(Duration::from_secs(1));
50            // TODO: Rename Permit::wait and related functions to clarify that they are waiting for
51            //       subordinates to drop.
52            // TODO: Add a method to wait for a permit to be revoked, and another with a timeout.
53            if permit.is_revoked() {
54                return;
55            }
56            for subscriber in state.subscribers.lock().unwrap().iter_mut() {
57                subscriber.send(Event::Message(n.to_string()));
58            }
59        }
60        state.subscribers.lock().unwrap().clear();
61    }
62}
63
64#[allow(clippy::unnecessary_wraps)]
65fn subscribe(state: &Arc<State>, _req: &Request) -> Result<Response, Response> {
66    let (sender, response) = Response::event_stream();
67    state.subscribers.lock().unwrap().push(sender);
68    Ok(response)
69}
70
71#[allow(clippy::unnecessary_wraps)]
72fn handle_req(state: &Arc<State>, req: &Request) -> Result<Response, Response> {
73    match (req.method(), req.url().path()) {
74        ("GET", "/health") => Ok(Response::text(200, "ok")),
75        ("GET", "/subscribe") => subscribe(state, req),
76        _ => Ok(Response::text(404, "Not found")),
77    }
78}
79
80pub fn main() {
81    println!("Access the server at http://127.0.0.1:8000/subscribe");
82    let event_sender_thread_permit = Permit::new();
83    let state = Arc::new(State::new());
84    let state_clone = state.clone();
85    std::thread::spawn(move || event_sender_thread(state_clone, event_sender_thread_permit));
86    safina_timer::start_timer_thread();
87    let executor = safina_executor::Executor::default();
88    let request_handler = move |req: Request| print_log_response(&req, handle_req(&state, &req));
89    executor
90        .block_on(
91            HttpServerBuilder::new()
92                .listen_addr(socket_addr_127_0_0_1(8000))
93                .max_conns(100)
94                .spawn_and_join(request_handler),
95        )
96        .unwrap();
97}