use crate::core::natss::{as_ack_policy, as_deliver_policy_for_str};
use crate::prelude2::*;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
pub async fn list_stream(ctx: web::Data<AppContext>, request: HttpRequest) -> impl Responder {
match ctx.nats().list_stream().await {
Ok(names) => request.json(200, R::ok(names)),
Err(e) => request.json(200, R::failed(400, e.to_string())),
}
}
pub async fn create_stream(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
let subject = from
.get("subject")
.ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;
let subject_iter = subject.split(',');
let vec_of_strings: Vec<String> = subject_iter.map(|s| s.trim().to_string()).collect();
if let Err(e) = ctx.nats().create_stream(stream_name, vec_of_strings).await {
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::ok(true))
}
pub async fn delete_stream(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
if let Err(e) = ctx.nats().delete_stream(stream_name).await {
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::ok(true))
}
pub async fn remove_subject(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
let subject_to_remove = from
.get("subject_to_remove")
.ok_or_else(|| Error::invalid_request("Missing post data field: subject_to_remove"))?;
if let Err(e) = ctx
.nats()
.remove_subject(stream_name, subject_to_remove)
.await
{
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::ok(true))
}
pub async fn add_subscribe(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
let subject = from
.get("subject")
.ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;
if let Err(e) = ctx.nats().add_subject_to_stream(stream_name, subject).await {
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::ok(true))
}
pub async fn publish(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let subject = from
.get("subject")
.ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;
let message = from
.get("message")
.ok_or_else(|| Error::invalid_request("Missing post data field: message"))?;
if let Err(e) = ctx.nats().publish(subject, message).await {
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::ok(true))
}
pub async fn subscribe(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let consumer_name = from.get("consumer_name").cloned();
let consumer_id = from
.get("consumer_id")
.ok_or_else(|| Error::invalid_request("Missing post data field: consumer_id"))?;
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
let subject = from
.get("subject")
.ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;
let ack_policy_str = from
.get("ack_policy")
.ok_or_else(|| Error::invalid_request("Missing post data field: ack_policy"))?;
let deliver_policy_str = from
.get("deliver_policy")
.ok_or_else(|| Error::invalid_request("Missing post data field: deliver_policy"))?;
let deliver_deal = from.get("deliver_deal");
let ack_policy = as_ack_policy(ack_policy_str);
let deliver_policy = as_deliver_policy_for_str(deliver_policy_str, deliver_deal)?;
if ack_policy.is_none() {
return request.json(200, R::failed(413, "无效的 ack_policy"));
}
if deliver_policy.is_none() {
return request.json(200, R::failed(413, "无效的 deliver_policy"));
}
if let Err(e) =
ctx.nats()
.subscribe(
consumer_id.to_string(),
consumer_name,
stream_name,
subject,
ack_policy.unwrap(),
deliver_policy.unwrap(),
|_info, subject, message| async move {
handle_nats_message(_info, subject, message).await
},
)
.await
{
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::success(true, "订阅成功"))
}
pub async fn listen(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let consumer_id = from
.get("consumer_id")
.ok_or_else(|| Error::invalid_request("Missing post data field: consumer_id"))?;
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
let subject = from
.get("subject")
.ok_or_else(|| Error::invalid_request("Missing post data field: subject"))?;
if let Err(e) =
ctx.nats()
.subscribe(
consumer_id.to_string(),
None,
stream_name,
subject,
AckPolicy::None,
DeliverPolicy::New,
|_info, subject, message| async move {
handle_nats_message(_info, subject, message).await
},
)
.await
{
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::success(true, "订阅成功"))
}
pub async fn handle_nats_message(
_info: (String, String, String, u64),
subject: String,
message: String,
) -> Option<bool> {
log::info!(
"Received stream_name={}, consumer_id={}, consumer_name={}, subject={}, msg_id={}, message: {}",
_info.0, _info.1, _info.2, subject,
_info.3, message
);
Some(true)
}
pub async fn unsubscribe(
from: web::Form<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: stream_name"))?;
let consumer_name = from
.get("consumer_name")
.ok_or_else(|| Error::invalid_request("Missing post data field: consumer_name"))?;
if let Err(e) = ctx.nats().unsubscribe(stream_name, consumer_name).await {
return request.json(200, R::failed(400, e.to_string()));
}
request.json(200, R::ok(true))
}
pub async fn list_consumer(
from: web::Query<HashMap<String, String>>,
ctx: web::Data<AppContext>,
request: HttpRequest,
) -> impl Responder {
let stream_name = from
.get("stream_name")
.ok_or_else(|| Error::invalid_request("Missing query string parameter: stream_name"))?;
match ctx.nats().list_consumer(stream_name).await {
Ok(consumer) => request.text(200, &consumer),
Err(e) => request.json(200, R::failed(400, e.to_string())),
}
}