use crate::prelude2::*;
use std::collections::HashMap;
use futures::stream::StreamExt;
#[derive(Debug, serde::Serialize, serde::Deserialize)]
pub struct SimpileMessage {
pub id: u32,
pub message: String,
}
pub async fn publish_message(
query: web::Query<HashMap<String, String>>,
message: web::Json<SimpileMessage>,
request: HttpRequest,
app_state: web::Data<AppState>,
) -> impl Responder {
let topic = query.get("topic").ok_or_else(|| {
Errors::InvalidRequestError("Missing query string parameter: topic".to_string())
})?;
match app_state.redis().publish(topic, &message) {
Ok(()) => request.json(200, R::success(message, String::from("message published"))),
Err(e) => request.json(200, R::failed(500, e.to_string())),
}
}
pub async fn subscribe_message(
query: web::Query<HashMap<String, String>>,
app_state: web::Data<AppState>,
) -> Result<HttpResponse, Errors> {
let topic = query.get("topic").ok_or_else(|| {
Errors::InvalidRequestError("Missing query string parameter: topic".to_string())
})?;
match app_state.redis_pub().subscribe(topic).await {
Ok(rx) => Ok(HttpResponse::Ok()
.append_header(("Content-Type", "text/event-stream; charset=utf-8"))
.append_header(("Cache-Control", "no-store"))
.append_header(("X-Accel-Buffering", "no"))
.streaming(rx.map(
|message| -> Result<actix_web::web::Bytes, std::convert::Infallible> {
log::info!("subscribe_message: message={}", message);
Ok(actix_web::web::Bytes::from(message))
},
))),
Err(e) => Ok(HttpResponse::Ok().json(R::failed(400, e.to_string()))),
}
}