use std::time::Duration;
use shove::nats::{NatsConfig, NatsConsumerGroupConfig};
use shove::{
Broker, ConsumerGroupConfig, MessageHandler, MessageMetadata, Nats, Outcome, RawBytesCodec,
TopologyBuilder,
};
use testcontainers::ImageExt;
use testcontainers::runners::AsyncRunner;
use testcontainers_modules::nats::{Nats as NatsImage, NatsServerCmd};
shove::define_topic!(
RawTopic,
Vec<u8>,
TopologyBuilder::new("raw-bytes").dlq().build(),
codec = RawBytesCodec
);
struct RawHandler;
impl MessageHandler<RawTopic> for RawHandler {
type Context = ();
async fn handle(&self, message: Vec<u8>, _: MessageMetadata, _: &()) -> Outcome {
println!("Received {} bytes: {:?}", message.len(), message);
Outcome::Ack
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
tracing_subscriber::fmt::init();
let cmd = NatsServerCmd::default().with_jetstream();
let container = NatsImage::default().with_cmd(&cmd).start().await?;
let port = container.get_host_port_ipv4(4222).await?;
let url = format!("nats://localhost:{port}");
let broker = Broker::<Nats>::new(NatsConfig::new(&url)).await?;
broker.topology().declare::<RawTopic>().await?;
let publisher = broker.publisher().await?;
let payload: Vec<u8> = vec![0x01, 0x02, 0x03];
publisher.publish::<RawTopic>(&payload).await?;
println!("Published raw payload: {payload:?}");
let mut group = broker.consumer_group();
group
.register::<RawTopic, _>(
ConsumerGroupConfig::new(NatsConsumerGroupConfig::new(1..=1)),
|| RawHandler,
)
.await?;
let outcome = group
.run_until_timeout(
async {
tokio::select! {
_ = tokio::time::sleep(Duration::from_secs(3)) => {}
_ = tokio::signal::ctrl_c() => {}
}
},
Duration::from_secs(10),
)
.await;
println!("Done.");
std::process::exit(outcome.exit_code());
}