1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
use crate::http::{mime, Body, StatusCode};
use crate::log;
use crate::sse::Sender;
use crate::utils::BoxFuture;
use crate::{Endpoint, Request, Response, Result};

use async_std::future::Future;
use async_std::io::BufReader;
use async_std::task;

use std::marker::PhantomData;
use std::sync::Arc;

/// Create an endpoint that can handle SSE connections.
pub fn endpoint<F, Fut, State>(handler: F) -> SseEndpoint<F, Fut, State>
where
    State: Send + Sync + 'static,
    F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
    SseEndpoint {
        handler: Arc::new(handler),
        __state: PhantomData,
        __fut: PhantomData,
    }
}

/// An endpoint that can handle SSE connections.
#[derive(Debug)]
pub struct SseEndpoint<F, Fut, State>
where
    State: Send + Sync + 'static,
    F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
    handler: Arc<F>,
    __state: PhantomData<State>,
    __fut: PhantomData<Fut>,
}

impl<F, Fut, State> Endpoint<State> for SseEndpoint<F, Fut, State>
where
    State: Send + Sync + 'static,
    F: Fn(Request<State>, Sender) -> Fut + Send + Sync + 'static,
    Fut: Future<Output = Result<()>> + Send + Sync + 'static,
{
    fn call<'a>(&'a self, req: Request<State>) -> BoxFuture<'a, Result<Response>> {
        let handler = self.handler.clone();
        Box::pin(async move {
            let (sender, encoder) = async_sse::encode();
            task::spawn(async move {
                let sender = Sender::new(sender);
                if let Err(err) = handler(req, sender).await {
                    log::error!("SSE handler error: {:?}", err);
                }
            });

            // Perform the handshake as described here:
            // https://html.spec.whatwg.org/multipage/server-sent-events.html#sse-processing-model
            let mut res = Response::new(StatusCode::Ok);
            res.res.insert_header("Cache-Control", "no-cache").unwrap();
            res.res.set_content_type(mime::SSE);

            let body = Body::from_reader(BufReader::new(encoder), None);
            res.set_body(body);

            Ok(res)
        })
    }
}