use crate::prelude2::*;
use futures::stream::StreamExt;
pub async fn ttl(
query: web::Query<HashMap<String, String>>,
request: HttpRequest,
app_state: web::Data<AppContext>,
) -> impl Responder {
let key = query
.get("key")
.cloned()
.unwrap_or_else(|| String::from(""));
let val = app_state
.rudis()
.ttl(&key)
.await
.map_err(|e| Error::throw("", Some(e)))?;
request.json(200, R::ok(val))
}
pub async fn exists(
query: web::Query<HashMap<String, String>>,
request: HttpRequest,
app_state: web::Data<AppContext>,
) -> impl Responder {
let key = query
.get("key")
.cloned()
.unwrap_or_else(|| String::from(""));
let val = app_state
.rudis()
.exists(&key)
.await
.map_err(|e| Error::throw("", Some(e)))?;
request.json(200, R::ok(val))
}
pub async fn set(
query: web::Query<HashMap<String, String>>,
body: web::Bytes,
request: HttpRequest,
app_state: web::Data<AppContext>,
) -> impl Responder {
let key = query
.get("key")
.cloned()
.unwrap_or_else(|| String::from(""));
let val = match crate::commons::bytes_to_string(body.to_vec()) {
Ok(val) => val,
Err(e) => return request.json(200, R::failed(500, e.to_string())),
};
app_state
.rudis()
.set(&key, &val, 60 * 5)
.await
.map_err(|e| Error::throw("", Some(e)))?;
request.json(200, R::ok(true))
}
pub async fn get(
query: web::Query<HashMap<String, String>>,
request: HttpRequest,
app_state: web::Data<AppContext>,
) -> impl Responder {
let key = query
.get("key")
.cloned()
.unwrap_or_else(|| String::from(""));
let val = app_state
.rudis()
.get(&key)
.await
.map_err(|e| Error::throw("", Some(e)))?;
request.json(200, R::ok(val))
}
pub async fn rpub(
query: web::Query<HashMap<String, String>>,
body: web::Bytes,
request: HttpRequest,
app_state: web::Data<AppContext>,
) -> impl Responder {
let topic = query
.get("topic")
.ok_or_else(|| Error::invalid_request("Missing query string parameter: topic"))?;
let message = match crate::commons::bytes_to_string(body.to_vec()) {
Ok(val) => val,
Err(e) => return request.json(200, R::failed(500, e.to_string())),
};
match app_state.rudis().publishs(topic, &message).await {
Ok(_) => request.json(200, R::ok(true)),
Err(e) => request.json(200, R::failed(500, e.to_string())),
}
}
pub async fn rsub(
query: web::Query<HashMap<String, String>>,
app_state: web::Data<AppContext>,
) -> Result<HttpResponse> {
let topic = query
.get("topic")
.ok_or_else(|| Error::invalid_request("Missing query string parameter: topic"))?;
match app_state.rudis().subscribe(topic).await {
Ok(rx) => Ok(HttpResponse::Ok()
.append_header(("Content-Type", "text/event-stream; charset=utf-8"))
.append_header(("Cache-Control", "no-store"))
.append_header(("X-Accel-Buffering", "no"))
.streaming(rx.map(
|message| -> core::result::Result<actix_web::web::Bytes, std::convert::Infallible> {
log::info!("subscribe_message: message={}", message);
Ok(actix_web::web::Bytes::from(message))
},
))),
Err(e) => Ok(HttpResponse::Ok().json(R::failed(400, e.to_string()))),
}
}