web-queue-server 0.1.1

Backends for your web queue cluster
use crate::queue::connection::{BroadcastMessage, QueueConnection};
use crate::queue::map::Queue;
use actix_web::middleware::Logger;
use actix_web::{get, post, web, App, Error, HttpRequest, HttpResponse, HttpServer, Responder};
use actix_web_actors::ws;
use std::time::{SystemTime, UNIX_EPOCH};
use web_queue_meta::api::queue_scope_factory;
use web_queue_meta::message::EventMessage;
use web_queue_meta::response::BaseQueueResponse;

pub mod queue;

async fn subscribe_queue_ws(
    req: HttpRequest,
    stream: web::Payload,
    srv: web::Data<RuntimeQueue>,
    info: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> {
    let (queue_name, id) = info.into_inner();
    let guard = srv.read().await;
    let queue_connection = guard.subscribe_queue(queue_name);
    match queue_connection {
        None => Err(actix_web::error::ErrorNotFound("queue not created")),
        Some(qc) => ws::start(QueueConnection::new(id, qc), &req, stream),
    }
}

#[get("/listen/longpoll/{queue_name}/{uniq_id}")]
async fn subscribe_queue_longpoll(
    srv: web::Data<RuntimeQueue>,
    info: web::Path<(String, String)>,
) -> Result<HttpResponse, Error> {
    let (queue_name, id) = info.into_inner();
    let guard = srv.read().await;
    let queue_connection = guard.subscribe_queue(queue_name);
    match queue_connection {
        None => Err(actix_web::error::ErrorNotFound("Queue not found")),
        Some(mut qc) => loop {
            let message = qc.recv().await;
            match message {
                Ok(BroadcastMessage::Message(s)) if s.id == id => {
                    return Ok(HttpResponse::Ok().json(s))
                }
                Ok(BroadcastMessage::Message(_)) => continue,
                _ => return Err(actix_web::error::ErrorGone("Queue was closed")),
            }
        },
    }
}

#[post("/create/{queue_name}")]
async fn create_queue(srv: web::Data<RuntimeQueue>, info: web::Path<String>) -> impl Responder {
    let queue_name = info.into_inner();
    let mut guard = srv.write().await;
    match guard.create_queue(queue_name) {
        None => Err(actix_web::error::ErrorConflict("Queue already created")),
        Some(_) => Ok(HttpResponse::Created().json(BaseQueueResponse { success: true })),
    }
}

#[post("/send/{queue_name}")]
async fn send_to_queue(
    srv: web::Data<RuntimeQueue>,
    info: web::Path<String>,
    message: web::Json<EventMessage>,
) -> impl Responder {
    let queue_name = info.into_inner();
    let guard = srv.read().await;
    let mut message = message.into_inner();
    message.timestamp.get_or_insert(
        SystemTime::now()
            .duration_since(UNIX_EPOCH)
            .expect("fail to get time")
            .as_secs() as usize,
    );
    match guard.send_to_queue(queue_name, message) {
        false => Err(actix_web::error::ErrorNotFound("Queue not found")),
        true => Ok(HttpResponse::Ok().json(BaseQueueResponse { success: true })),
    }
}

#[post("/close/{queue_name}")]
async fn close_queue(srv: web::Data<RuntimeQueue>, info: web::Path<String>) -> impl Responder {
    let queue_name = info.into_inner();
    let mut guard = srv.write().await;
    match guard.close_queue(queue_name) {
        false => Err(actix_web::error::ErrorNotFound("Queue not found")),
        true => Ok(HttpResponse::Ok().json(BaseQueueResponse { success: true })),
    }
}

#[actix_web::main]
async fn main() -> tokio::io::Result<()> {
    let address = std::env::var("ADDR").unwrap_or_else(|_| String::from("0.0.0.0:8080"));
    env_logger::init();

    let queue = web::Data::new(RuntimeQueue::default());
    HttpServer::new(move || {
        App::new()
            .wrap(Logger::default())
            .app_data(queue.clone())
            .service(queue_scope_factory(
                create_queue,
                send_to_queue,
                close_queue,
                subscribe_queue_longpoll,
                subscribe_queue_ws,
            ))
    })
    .bind(address)?
    .run()
    .await
}

type RuntimeQueue = tokio::sync::RwLock<Queue<EventMessage>>;