zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use crate::prelude2::*;

use std::pin::Pin;
use std::task::{Context, Poll};

use actix_http::header::CONTENT_ENCODING;
use actix_http::ContentEncoding;
use actix_web::Error;

use futures::{stream::StreamExt, SinkExt};
use qrcode::render::svg;
use qrcode::{EcLevel, QrCode, Version};
use tokio::time;

use actix_web::web::Bytes; // Import Bytes type

#[derive(serde::Serialize, Debug)]
pub struct Task {
    pub name: &'static str,
    pub message: String,
}

pub struct TaskStream<T> {
    next: u32,
    buf: Vec<u8>,
    with_date: bool,
    with_number: bool,
    new_line: bool,
    data: T,
}

impl<T> futures::Stream for TaskStream<T>
where
    T: std::marker::Unpin + serde::Serialize,
{
    type Item = core::result::Result<Bytes, Error>;

    fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
        let this = self.get_mut();

        if this.next == 100_000 {
            Poll::Ready(None)
        } else {
            // tokio::time::sleep(std::time::Duration::from_secs(10)).await;
            if this.with_date {
                for v in (chrono::Local::now()
                    .format("%Y-%m-%dT%H:%M:%S.%6f%Z")
                    .to_string()
                    + " ")
                    .as_bytes()
                {
                    this.buf.push(*v);
                }
            }

            if this.with_number {
                for v in format!("{} ", this.next).as_bytes() {
                    this.buf.push(*v);
                }
            }

            if let Err(e) = serde_json::to_writer(&mut this.buf, &this.data) {
                return Poll::Ready(Some(Err(e.into())));
            }

            this.next += 1;

            if this.new_line {
                for v in crate::commons::NEWLINE.as_bytes() {
                    this.buf.push(*v);
                }
            }

            let poll = Poll::Ready(Some(Ok(Bytes::copy_from_slice(&this.buf))));

            this.buf.clear();

            poll
        }
    }
}

// https://tokio.rs/tokio/tutorial/streams
pub async fn task_streaming() -> HttpResponse {
    let stream = TaskStream::<Task> {
        next: 0,
        buf: Default::default(),
        with_date: true,
        with_number: true,
        new_line: true,
        data: Task {
            name: "Coucou ceci est mon nom",
            message: String::from(
                "Mon message doit être un peu long pour augmenter la taille 啦啦啦",
            ),
        },
    };

    HttpResponse::Ok()
        .content_type(mime::TEXT_PLAIN_UTF_8)
        .append_header((CONTENT_ENCODING, ContentEncoding::Identity))
        .append_header((
            "Cache-Control",
            "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0",
        ))
        .streaming(stream)
}

pub async fn streaming_index() -> HttpResponse {
    // Create a channel for sending numbers to the streaming response
    let (mut tx, rx) = futures::channel::mpsc::unbounded();

    // Spawn a separate task to send numbers every second
    tokio::spawn(async move {
        let mut counter = 1;
        let mut interval = time::interval(std::time::Duration::from_millis(1000));

        loop {
            interval.tick().await;
            // Send the current counter value to the client
            if tx.send(counter).await.is_err() {
                // The client has disconnected, stop sending numbers
                break;
            }
            counter += 1;
        }
    });

    // Create a streaming response that sends numbers from the channel
    HttpResponse::Ok()
        // .content_type("text/html; charset=utf-8")
        .content_type(mime::TEXT_PLAIN_UTF_8)
        .append_header((CONTENT_ENCODING, ContentEncoding::Identity))
        .append_header((
            "Cache-Control",
            "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0",
        ))
        .streaming(rx.map(
            |num| -> core::result::Result<Bytes, std::convert::Infallible> {
                let num_str = num.to_string() + "\r\n";
                Ok(Bytes::from(num_str)) // Convert to Bytes
            },
        ))
}

pub async fn streaming_image() -> HttpResponse {
    let (tx, rx) = futures::channel::mpsc::unbounded();

    tokio::spawn(async move {
        let mut interval = time::interval(std::time::Duration::from_secs(2));

        loop {
            interval.tick().await;

            let s = format!("http://127.0.0.1:8000/{}", crate::commons::random_token());

            let by = s.as_bytes();

            let image3 = QrCode::with_version(by, Version::Normal(3), EcLevel::L)
                .unwrap()
                .render()
                .min_dimensions(200, 200)
                .dark_color(svg::Color("#000000"))
                .light_color(svg::Color("#ffffff"))
                .build();

            if tx.unbounded_send(Bytes::from(image3)).is_err() {
                // The client has disconnected, stop sending numbers
                break;
            }
        }
    });

    HttpResponse::Ok()
        .content_type("image/svg+xml")
        // .content_type("text/event-stream") // 设置内容类型为 SSE 格式
        .append_header((CONTENT_ENCODING, ContentEncoding::Identity))
        // .append_header((
        //     "Cache-Control",
        //     "no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0",
        // ))
        .append_header(("Cache-Control", "no-cache"))
        .streaming(rx.map(
            |image_data| -> core::result::Result<Bytes, std::convert::Infallible> {
                Ok(image_data)
            },
        ))
}