vertigo-cli 0.11.2

Reactive Real-DOM library with SSR for Rust - packaging/serving tool
Documentation
use actix_web::{HttpResponse, Responder, web};
use futures::stream::Stream;
use std::task::Poll;
use tokio::sync::watch::Receiver;

use crate::commons::spawn::SpawnOwner;

use super::Status;

pub async fn handler_sse(state: web::Data<Receiver<Status>>) -> impl Responder {
    let stream = MyStream::new(state.get_ref().clone());

    HttpResponse::Ok()
        .insert_header(("content-type", "text/event-stream"))
        .streaming(stream)
}

struct MyStream {
    rx: Receiver<Status>,
    last_emit_value: Option<Status>,
    spawn: Option<SpawnOwner>,
}

impl MyStream {
    pub fn new(rx: Receiver<Status>) -> MyStream {
        MyStream {
            rx,
            last_emit_value: None,
            spawn: None,
        }
    }
}

impl Stream for MyStream {
    type Item = Result<actix_web::web::Bytes, actix_web::Error>;

    fn poll_next(
        mut self: std::pin::Pin<&mut Self>,
        cx: &mut std::task::Context<'_>,
    ) -> Poll<Option<Self::Item>> {
        if self.spawn.is_none() {
            let waker = cx.waker().clone();
            let mut rx = self.rx.clone();

            self.spawn = Some(SpawnOwner::new(async move {
                loop {
                    if let Err(err) = rx.changed().await {
                        log::error!("{err}");
                    };
                    waker.wake_by_ref();
                }
            }));
        }

        let value = self.rx.borrow().clone();

        if let Some(last_emit_value) = &self.last_emit_value
            && last_emit_value == &value
        {
            return Poll::Pending;
        }

        self.last_emit_value = Some(value.clone());

        let message = match value {
            Status::Building => "data: Building\n\n".to_string(),
            Status::Errors => "data: Errors\n\n".to_string(),
            Status::Version(version) => format!("data: Version = {version}\n\n"),
        };

        Poll::Ready(Some(Ok(actix_web::web::Bytes::from(message))))
    }
}