use crate::core::natss::{NatsClient, Settings};
use async_nats::{jetstream, ConnectOptions, Event};
pub async fn client(cfg: &Option<Settings>) -> Option<NatsClient> {
if let Some(cfg) = cfg {
if !cfg.enabled() {
return None;
}
let server = if let Some(s) = cfg.server.as_ref() {
s.to_string()
} else {
"nats://127.0.0.1:4222".to_string()
};
let opts = ConnectOptions::new().event_callback({
move |event: Event| {
async move {
match event {
Event::Connected => {
}
Event::Disconnected => {
}
Event::Closed => {
}
Event::SlowConsumer(_) => {
}
Event::ServerError(_e) => {
}
Event::ClientError(_e) => {
}
_ => {}
}
}
}
});
match opts.connect(&server).await {
Ok(client) => {
let js: jetstream::context::Context = jetstream::new(client.clone());
log::info!("nats-jetstream_connect_successful: server={}", server);
let clt = NatsClient {
server: server.clone(),
js,
client,
};
if let Err(e) = clt
.create_stream("EVENTS", vec!["events.*".to_string()])
.await
{
log::error!(
"nats-jetstream-connect-failed: server={}, error={:?}",
server,
e
);
}
Some(clt)
}
Err(_e) => {
log::error!(
"nats-jetstream-connect-failed: server={}, error={:?}",
server,
_e
);
None
}
}
} else {
None
}
}