use rabbitmq_stream_client::types::OffsetSpecification;
use rabbitmq_stream_client::Environment;
use tracing::{debug, info};
use crate::error::{Error, Result};
pub struct StreamClient {
environment: Environment,
}
impl StreamClient {
pub async fn connect(host: &str, port: u16, username: &str, password: &str) -> Result<Self> {
Self::connect_with_vhost(host, port, username, password, "/").await
}
pub async fn connect_with_vhost(
host: &str,
port: u16,
username: &str,
password: &str,
vhost: &str,
) -> Result<Self> {
info!("Connecting to stream protocol at {}:{}", host, port);
let environment = Environment::builder()
.host(host)
.port(port)
.username(username)
.password(password)
.virtual_host(vhost)
.build()
.await
.map_err(|e| Error::Stream(format!("Failed to connect: {:?}", e)))?;
debug!("Stream protocol connection established");
Ok(Self { environment })
}
pub async fn create_consumer(
&self,
stream: &str,
offset: OffsetSpecification,
) -> Result<rabbitmq_stream_client::Consumer> {
let consumer = self
.environment
.consumer()
.offset(offset)
.build(stream)
.await
.map_err(|e| {
Error::Stream(format!("Failed to create consumer for {}: {:?}", stream, e))
})?;
debug!("Stream consumer created for {}", stream);
Ok(consumer)
}
pub fn environment(&self) -> &Environment {
&self.environment
}
}