use async_nats::jetstream::consumer::pull::Stream;
use async_nats::jetstream::consumer::{AckPolicy, DeliverPolicy};
use async_nats::jetstream::{self, consumer, stream::Config, Message};
use async_nats::Client;
use std::collections::HashMap;
use futures::StreamExt;
use std::future::Future;
use time::OffsetDateTime;
pub async fn loop_messages<F, Fut>(
consumer_info: consumer::Info,
mut messages: Stream,
stream_name: &str,
handle: F,
) where
F: Fn((String, String, String, u64), String, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Option<bool>> + Send + 'static,
{
while let Some(Ok(msg)) = messages.next().await {
let subject = msg.subject.to_string();
let consumer_id = if let Some(id) = consumer_info.config.metadata.get("consumer_id") {
&id.to_string()
} else {
"N/a"
};
let info = if let Ok(_info) = msg.info() {
(
_info.stream.to_string(),
consumer_id.to_string(),
_info.consumer.to_string(),
_info.stream_sequence,
)
} else {
("".to_string(), consumer_id.to_string(), "".to_string(), 0)
};
match String::from_utf8(msg.payload.to_vec()) {
Ok(s) => {
let process_result = handle(info, subject.clone(), s).await;
handle_ack(process_result, stream_name, &subject, msg).await;
}
Err(e) => {
log::error!(
"nats-JetStream-messages-error: stream_name={}, subject={}, error={}",
stream_name,
subject,
e
);
}
}
}
}
pub async fn handle_ack(
process_result: Option<bool>,
stream_name: &str,
subject: &str,
msg: Message,
) {
if let Some(b) = process_result {
if !b {
return;
}
match msg.ack().await {
Ok(_) => {
}
Err(_e) => {
log::error!(
"nats-JetStream-messages-error: stream_name={}, subject={}, error={}",
stream_name,
subject,
_e
);
}
}
}
}
pub fn as_ack_policy(s: &str) -> Option<AckPolicy> {
match s.trim() {
"Explicit" => Some(AckPolicy::Explicit),
"None" => Some(AckPolicy::None), "All" => Some(AckPolicy::All),
_ => None,
}
}
pub fn as_deliver_policy(
s: &str,
deliver_deal: Option<u64>,
) -> anyhow::Result<Option<DeliverPolicy>> {
log::info!("as_deliver_policy: str={}, val={:?}", s, deliver_deal);
let l = if "ByStartSequence" == s {
if let Some(sequence) = deliver_deal {
let policy = if sequence == 0 {
DeliverPolicy::All
} else {
DeliverPolicy::ByStartSequence {
start_sequence: sequence,
}
};
Some(policy)
} else {
return Err(anyhow::anyhow!("必须指定消息序号"));
}
} else if "ByStartTime" == s {
if let Some(start_time) = deliver_deal {
let t = OffsetDateTime::from_unix_timestamp(start_time as i64)?;
Some(DeliverPolicy::ByStartTime { start_time: t })
} else {
return Err(anyhow::anyhow!("必须指定开始时间"));
}
} else {
None
};
Ok(match s.trim() {
"All" => Some(DeliverPolicy::All), "Last" => Some(DeliverPolicy::Last), "New" => Some(DeliverPolicy::New), "ByStartSequence" => l, "ByStartTime" => l, "LastPerSubject" => Some(DeliverPolicy::LastPerSubject), _ => None,
})
}
pub fn as_deliver_policy_for_str<T>(
deliver_policy_str: &str,
deliver_deal_str: Option<T>,
) -> anyhow::Result<Option<DeliverPolicy>>
where
T: AsRef<str>,
{
if "ByStartSequence" == deliver_policy_str || "ByStartTime" == deliver_policy_str {
if let Some(s) = deliver_deal_str {
let s_ref = s.as_ref().trim();
let deliver_deal_val: u64 = s_ref
.parse::<u64>()
.map_err(|e| anyhow::anyhow!("invalid number '{}': {}", s_ref, e))?;
Ok(as_deliver_policy(
deliver_policy_str,
Some(deliver_deal_val),
)?)
} else {
Ok(None)
}
} else {
Ok(as_deliver_policy(deliver_policy_str, None)?)
}
}
#[derive(Clone)]
pub struct NatsClient {
pub server: String,
pub client: Client,
pub js: jetstream::context::Context,
}
impl NatsClient {
pub async fn list_stream(&self) -> anyhow::Result<Vec<(String, Vec<String>)>> {
let mut names = Vec::new();
let mut streams = self.js.stream_names();
while let Some(name_result) = streams.next().await {
match name_result {
Ok(stream_name) => {
match self.js.get_stream_no_info(&stream_name).await {
Ok(info) => match info.get_info().await {
Ok(_info) => {
names.push((stream_name.clone(), _info.config.subjects.clone()));
}
Err(_e) => {
return Err(anyhow::anyhow!(_e));
}
},
Err(e) => {
log::error!("nats-JetStream-getting-stream_info-error: stream_name={}, error={}", stream_name, e);
return Err(anyhow::anyhow!(e));
}
}
}
Err(e) => {
log::error!("nats-JetStream-name_result-error: error={}", e);
return Err(anyhow::anyhow!(e));
}
}
}
Ok(names)
}
pub async fn create_stream(
&self,
stream_name: &str,
subject: Vec<String>,
) -> anyhow::Result<()> {
let subject_string = subject.join(", ");
match self
.js
.create_stream(Config {
name: stream_name.to_string(),
subjects: subject,
storage: jetstream::stream::StorageType::File,
..Default::default()
})
.await
{
Ok(_s) => {
log::info!(
"nats-JetStream-create_stream-succeed: stream_name={}, subject={}",
stream_name,
subject_string
);
Ok(())
}
Err(_e) => {
log::error!(
"nats-JetStream-create_stream-failed: stream_name={}, subject={}, error={}",
stream_name,
subject_string,
_e
);
Err(anyhow::anyhow!(_e))
}
}
}
pub async fn list_consumer(&self, stream_name: &str) -> anyhow::Result<String> {
match self.js.get_stream(stream_name).await {
Ok(stream) => {
let mut result = Vec::<String>::new();
result.push(format!("Consumers under stream '{}':", stream_name));
let mut consumers = stream.consumers();
let mut count = 0;
while let Some(consumer_info) = consumers.next().await {
match consumer_info {
Ok(info) => {
result.push(format!(
"- Name: {:<15} | Filter: {:<35?} | AckPolicy: {:<10?} | DeliverPolicy: {:<10?} ",
info.name, info.config.filter_subject, info.config.ack_policy, info.config.deliver_policy
));
count += 1;
}
Err(_e) => {
return Err(anyhow::anyhow!(_e));
}
}
}
result.push(format!("✅ Total consumers: {}", count));
Ok(result.join("\n"))
}
Err(_e) => Err(anyhow::anyhow!(_e)),
}
}
pub async fn delete_stream(&self, stream_name: &str) -> anyhow::Result<()> {
match self.js.delete_stream(stream_name).await {
Ok(_) => Ok(()),
Err(_e) => {
log::error!(
"nats-JetStream-delete_stream-error: stream_name={}, error={}",
stream_name,
_e
);
Err(anyhow::anyhow!(_e))
}
}
}
pub async fn remove_subject(
&self,
stream_name: &str,
subject_to_remove: &str,
) -> anyhow::Result<()> {
let info = self.js.get_stream_no_info(stream_name).await?;
let _info = info.get_info().await?;
let mut config = _info.config;
config.subjects.retain(|s| s != subject_to_remove);
self.js.update_stream(config).await?;
Ok(())
}
pub async fn add_subject_to_stream(
&self,
stream_name: &str,
new_subject: &str,
) -> anyhow::Result<()> {
match self.js.get_stream_no_info(stream_name).await {
Ok(stream_info) => {
match stream_info.get_info().await {
Ok(info) => {
let mut config = info.config;
if !config.subjects.contains(&new_subject.to_string()) {
config.subjects.push(new_subject.to_string());
match self.js.update_stream(&config).await {
Ok(_) => {
log::info!(
"nats-JetStream-Added-subject: stream_name={}, subject={}",
stream_name,
new_subject
);
Ok(())
}
Err(e) => {
log::error!("nats-JetStream-update-stream-error: stream_name={}, error={}", stream_name, e);
Err(anyhow::anyhow!(e))
}
}
} else {
log::info!(
"nats-JetStream-Already-exists-subject: stream_name={}, subject={}",
stream_name,
new_subject
);
Ok(())
}
}
Err(e) => {
log::error!(
"nats-JetStream-getting-info-error: stream_name={}, error={}",
stream_name,
e
);
Err(anyhow::anyhow!(e))
}
}
}
Err(e) => {
log::error!(
"nats-JetStream-getting-stream_info-error: stream_name={}, error={}",
stream_name,
e
);
Err(anyhow::anyhow!(e))
}
}
}
pub async fn publish(&self, subject: &str, message: &str) -> anyhow::Result<()> {
match self
.js
.publish(subject.to_string(), message.to_string().into())
.await
{
Ok(_i) => {
log::info!(
"nats-JetStream-publish-succeed: subject={}, message={}",
subject,
message
);
Ok(())
}
Err(_e) => {
log::error!(
"nats-JetStream-publish-failed: subject={}, error={}",
subject,
_e
);
Err(anyhow::anyhow!(_e))
}
}
}
pub async fn unsubscribe(&self, stream_name: &str, consumer_name: &str) -> anyhow::Result<()> {
match self.js.get_stream(stream_name).await {
Ok(stream) => {
match stream.delete_consumer(consumer_name).await {
Ok(_) => {
log::info!("nats-JetStream_delete_consumer-succeed: consumer_name={}, stream_name={}", consumer_name, stream_name);
Ok(())
}
Err(err) => {
log::error!("nats-JetStream_delete_consumer-error: consumer_name={}, stream_name={}, error={}", consumer_name, stream_name, err);
Err(anyhow::anyhow!(err))
}
}
}
Err(_e) => {
log::error!(
"nats-JetStream_get_stream-error: consumer_name={}, stream_name={}, error={}",
consumer_name,
stream_name,
_e
);
Err(anyhow::anyhow!(_e))
}
}
}
#[allow(clippy::too_many_arguments)]
pub async fn subscribe<F, Fut>(
&self,
consumer_id: String,
consumer_name: Option<String>,
stream_name: &str,
subject: &str,
ack_policy: AckPolicy,
deliver_policy: DeliverPolicy,
handle: F,
) -> anyhow::Result<()>
where
F: Fn((String, String, String, u64), String, String) -> Fut + Send + Sync + 'static,
Fut: Future<Output = Option<bool>> + Send + 'static,
{
let mut _metadata = HashMap::new();
_metadata.insert("consumer_id".to_string(), consumer_id.clone());
let config = consumer::pull::Config {
metadata: _metadata,
durable_name: consumer_name.clone(),
filter_subject: subject.to_string(), ack_policy,
deliver_policy,
..Default::default()
};
match self.js.get_stream(stream_name).await {
Ok(stream) => {
match stream.create_consumer(config).await {
Ok(mut consumer) => {
let info = consumer.info().await?.clone();
log::info!("nats-JetStream_subscribe-succeed: consumer_id={}, consumer_name={}, stream_name={}, subject={}",
consumer_id, consumer_name.unwrap_or("N/a".to_string()), stream_name, subject);
let stream_name = stream_name.to_string();
let subject = subject.to_string();
tokio::spawn(async move {
match consumer.messages().await {
Ok(messages) => {
loop_messages(info, messages, &stream_name, handle).await;
}
Err(e) => {
log::error!(
"nats-JetStream-pull_messages-error: stream_name={}, subject={}, error={}",
stream_name,
subject,
e
);
}
}
});
Ok(())
}
Err(_e) => Err(anyhow::anyhow!(_e)),
}
}
Err(_e) => Err(anyhow::anyhow!(_e)),
}
}
}