zero4rs 2.0.0

zero4rs is a powerful, pragmatic, and extremely fast web framework for Rust
Documentation
use crate::core::natss::{as_ack_policy, as_deliver_policy_for_str};
use crate::prelude2::*;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};

pub async fn list_stream(ctx: web::Data<AppContext>, request: HttpRequest) -> impl Responder {
    match ctx.nats().list_stream().await {
        Ok(names) => request.json(200, R::ok(names)),
        Err(e) => request.json(200, R::failed(400, e.to_string())),
    }
}

pub async fn create_stream(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    let subject = from
        .get("subject")
        .ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;

    // split 返回 Split<&str>
    let subject_iter = subject.split(',');

    // 转成 Vec<String>
    let vec_of_strings: Vec<String> = subject_iter.map(|s| s.trim().to_string()).collect();

    if let Err(e) = ctx.nats().create_stream(stream_name, vec_of_strings).await {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::ok(true))
}

pub async fn delete_stream(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    if let Err(e) = ctx.nats().delete_stream(stream_name).await {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::ok(true))
}

pub async fn remove_subject(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    let subject_to_remove = from
        .get("subject_to_remove")
        .ok_or_else(|| Error::invalid_request("Missing post data field: subject_to_remove"))?;

    if let Err(e) = ctx
        .nats()
        .remove_subject(stream_name, subject_to_remove)
        .await
    {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::ok(true))
}

pub async fn add_subscribe(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    let subject = from
        .get("subject")
        .ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;

    if let Err(e) = ctx.nats().add_subject_to_stream(stream_name, subject).await {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::ok(true))
}

pub async fn publish(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let subject = from
        .get("subject")
        .ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;

    let message = from
        .get("message")
        .ok_or_else(|| Error::invalid_request("Missing post data field: message"))?;

    if let Err(e) = ctx.nats().publish(subject, message).await {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::ok(true))
}

pub async fn subscribe(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let consumer_name = from.get("consumer_name").cloned();

    let consumer_id = from
        .get("consumer_id")
        .ok_or_else(|| Error::invalid_request("Missing post data field: consumer_id"))?;

    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    let subject = from
        .get("subject")
        .ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;

    let ack_policy_str = from
        .get("ack_policy")
        .ok_or_else(|| Error::invalid_request("Missing post data field: ack_policy"))?;

    let deliver_policy_str = from
        .get("deliver_policy")
        .ok_or_else(|| Error::invalid_request("Missing post data field: deliver_policy"))?;

    let deliver_deal = from.get("deliver_deal");

    let ack_policy = as_ack_policy(ack_policy_str);
    let deliver_policy = as_deliver_policy_for_str(deliver_policy_str, deliver_deal)?;

    if ack_policy.is_none() {
        return request.json(200, R::failed(413, "无效的 ack_policy"));
    }

    if deliver_policy.is_none() {
        return request.json(200, R::failed(413, "无效的 deliver_policy"));
    }

    // 可以验证 subject 是否存在

    if let Err(e) =
        ctx.nats()
            .subscribe(
                consumer_id.to_string(),
                consumer_name,
                stream_name,
                subject,
                ack_policy.unwrap(),
                deliver_policy.unwrap(),
                |_info, subject, message| async move {
                    handle_nats_message(_info, subject, message).await
                },
            )
            .await
    {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::success(true, "订阅成功"))
}

pub async fn listen(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let consumer_id = from
        .get("consumer_id")
        .ok_or_else(|| Error::invalid_request("Missing post data field: consumer_id"))?;

    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    let subject = from
        .get("subject")
        .ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;

    // 可以验证 subject 是否存在

    if let Err(e) =
        ctx.nats()
            .subscribe(
                consumer_id.to_string(),
                None,
                stream_name,
                subject,
                AckPolicy::None,
                DeliverPolicy::New,
                |_info, subject, message| async move {
                    handle_nats_message(_info, subject, message).await
                },
            )
            .await
    {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::success(true, "订阅成功"))
}

/// 通用 JetStream 消息日志函数
pub async fn handle_nats_message(
    _info: (String, String, String, u64),
    subject: String,
    message: String,
) -> Option<bool> {
    log::info!(
        "Received stream_name={}, consumer_id={}, consumer_name={}, subject={}, msg_id={}, message: {}",
        _info.0, // stream_name
        _info.1, // consumer_id
        _info.2, // consumer_name
        subject,
        _info.3, // sequence
        message
    );

    Some(true)
}

pub async fn unsubscribe(
    from: web::Form<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;

    let consumer_name = from
        .get("consumer_name")
        .ok_or_else(|| Error::invalid_request("Missing post data field: consumer_name"))?;

    if let Err(e) = ctx.nats().unsubscribe(stream_name, consumer_name).await {
        return request.json(200, R::failed(400, e.to_string()));
    }

    request.json(200, R::ok(true))
}

pub async fn list_consumer(
    from: web::Query<HashMap<String, String>>,
    ctx: web::Data<AppContext>,
    request: HttpRequest,
) -> impl Responder {
    let stream_name = from
        .get("stream_name")
        .ok_or_else(|| Error::invalid_request("Missing query string parameter: stream_name"))?;

    match ctx.nats().list_consumer(stream_name).await {
        Ok(consumer) => request.text(200, &consumer),
        Err(e) => request.json(200, R::failed(400, e.to_string())),
    }
}