use crate::prelude2::*;
use std::pin::Pin;
use std::task::{Context, Poll};
use actix_http::header::CONTENT_ENCODING;
use actix_http::ContentEncoding;
use actix_web::Error;
use futures::{stream::StreamExt, SinkExt};
use qrcode::render::svg;
use qrcode::{EcLevel, QrCode, Version};
use tokio::time;
use actix_web::web::Bytes;
#[derive(serde::Serialize, Debug)]
pub struct Task {
pub name: &'static str,
pub message: String,
}
pub struct TaskStream<T> {
next: u32,
buf: Vec<u8>,
with_date: bool,
with_number: bool,
new_line: bool,
data: T,
}
impl<T> futures::Stream for TaskStream<T>
where
T: std::marker::Unpin + serde::Serialize,
{
type Item = core::result::Result<Bytes, Error>;
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
if this.next == 100_000 {
Poll::Ready(None)
} else {
if this.with_date {
for v in (chrono::Local::now()
.format("%Y-%m-%dT%H:%M:%S.%6f%Z")
.to_string()
+ " ")
.as_bytes()
{
this.buf.push(*v);
}
}
if this.with_number {
for v in format!("{} ", this.next).as_bytes() {
this.buf.push(*v);
}
}
if let Err(e) = serde_json::to_writer(&mut this.buf, &this.data) {
return Poll::Ready(Some(Err(e.into())));
}
this.next += 1;
if this.new_line {
for v in crate::commons::NEWLINE.as_bytes() {
this.buf.push(*v);
}
}
let poll = Poll::Ready(Some(Ok(Bytes::copy_from_slice(&this.buf))));
this.buf.clear();
poll
}
}
}
pub async fn task_streaming() -> HttpResponse {
let stream = TaskStream::<Task> {
next: 0,
buf: Default::default(),
with_date: true,
with_number: true,
new_line: true,
data: Task {
name: "Coucou ceci est mon nom",
message: String::from(
"Mon message doit être un peu long pour augmenter la taille 啦啦啦",
),
},
};
HttpResponse::Ok()
.content_type(mime::TEXT_PLAIN_UTF_8)
.append_header((CONTENT_ENCODING, ContentEncoding::Identity))
.append_header((
"Cache-Control",
"no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0",
))
.streaming(stream)
}
pub async fn streaming_index() -> HttpResponse {
let (mut tx, rx) = futures::channel::mpsc::unbounded();
tokio::spawn(async move {
let mut counter = 1;
let mut interval = time::interval(std::time::Duration::from_millis(1000));
loop {
interval.tick().await;
if tx.send(counter).await.is_err() {
break;
}
counter += 1;
}
});
HttpResponse::Ok()
.content_type(mime::TEXT_PLAIN_UTF_8)
.append_header((CONTENT_ENCODING, ContentEncoding::Identity))
.append_header((
"Cache-Control",
"no-store, no-cache, must-revalidate, proxy-revalidate, max-age=0",
))
.streaming(rx.map(
|num| -> core::result::Result<Bytes, std::convert::Infallible> {
let num_str = num.to_string() + "\r\n";
Ok(Bytes::from(num_str)) },
))
}
pub async fn streaming_image() -> HttpResponse {
let (tx, rx) = futures::channel::mpsc::unbounded();
tokio::spawn(async move {
let mut interval = time::interval(std::time::Duration::from_secs(2));
loop {
interval.tick().await;
let s = format!("http://127.0.0.1:8000/{}", crate::commons::random_token());
let by = s.as_bytes();
let image3 = QrCode::with_version(by, Version::Normal(3), EcLevel::L)
.unwrap()
.render()
.min_dimensions(200, 200)
.dark_color(svg::Color("#000000"))
.light_color(svg::Color("#ffffff"))
.build();
if tx.unbounded_send(Bytes::from(image3)).is_err() {
break;
}
}
});
HttpResponse::Ok()
.content_type("image/svg+xml")
.append_header((CONTENT_ENCODING, ContentEncoding::Identity))
.append_header(("Cache-Control", "no-cache"))
.streaming(rx.map(
|image_data| -> core::result::Result<Bytes, std::convert::Infallible> {
Ok(image_data)
},
))
}