use std::sync::Arc;
use crate::connection::common::StartupMessage;
use crate::errors::Error;
#[derive(Default)]
pub(super) struct BuilderState {
pub(super) address: Option<String>,
pub(super) client_id: Option<i32>,
pub(super) tcp_no_delay: bool,
pub(super) startup_callback: Option<Arc<dyn Fn(StartupMessage) + Send + Sync>>,
}
pub(super) struct ValidatedPieces {
pub(super) address: String,
pub(super) client_id: i32,
pub(super) tcp_no_delay: bool,
pub(super) startup_callback: Option<Arc<dyn Fn(StartupMessage) + Send + Sync>>,
}
impl BuilderState {
pub(super) fn validate(self) -> Result<ValidatedPieces, Error> {
Ok(ValidatedPieces {
address: self
.address
.ok_or_else(|| Error::InvalidArgument("ClientBuilder: address is required".into()))?,
client_id: self
.client_id
.ok_or_else(|| Error::InvalidArgument("ClientBuilder: client_id is required".into()))?,
tcp_no_delay: self.tcp_no_delay,
startup_callback: self.startup_callback,
})
}
}
#[cfg(feature = "sync")]
pub mod sync_impl {
use std::sync::Arc;
use super::BuilderState;
use crate::client::sync::Client;
use crate::connection::common::StartupMessage;
use crate::errors::Error;
use crate::subscriptions::notice_stream::sync_impl::NoticeStream;
use crate::transport::sync::NoticeBroadcaster;
#[derive(Default)]
#[must_use = "ClientBuilder does nothing until you call connect() or connect_with_notice_stream()"]
pub struct ClientBuilder {
state: BuilderState,
}
impl ClientBuilder {
pub fn address(mut self, addr: impl Into<String>) -> Self {
self.state.address = Some(addr.into());
self
}
pub fn client_id(mut self, id: i32) -> Self {
self.state.client_id = Some(id);
self
}
pub fn tcp_no_delay(mut self, enabled: bool) -> Self {
self.state.tcp_no_delay = enabled;
self
}
pub fn startup_callback(mut self, callback: impl Fn(StartupMessage) + Send + Sync + 'static) -> Self {
self.state.startup_callback = Some(Arc::new(callback));
self
}
pub fn connect(self) -> Result<Client, Error> {
let broadcaster = Arc::new(NoticeBroadcaster::new());
self.connect_with_broadcaster(broadcaster)
}
pub fn connect_with_notice_stream(self) -> Result<(Client, NoticeStream), Error> {
let broadcaster = Arc::new(NoticeBroadcaster::new());
let stream = NoticeStream::new(broadcaster.subscribe());
let client = self.connect_with_broadcaster(broadcaster)?;
Ok((client, stream))
}
fn connect_with_broadcaster(self, broadcaster: Arc<NoticeBroadcaster>) -> Result<Client, Error> {
let pieces = self.state.validate()?;
Client::connect_with_pieces(
&pieces.address,
pieces.client_id,
pieces.tcp_no_delay,
pieces.startup_callback,
broadcaster,
)
}
}
}
#[cfg(feature = "async")]
pub mod async_impl {
use std::sync::Arc;
use tokio::sync::broadcast;
use super::BuilderState;
use crate::client::r#async::Client;
use crate::connection::common::StartupMessage;
use crate::errors::Error;
use crate::messages::Notice;
use crate::subscriptions::notice_stream::async_impl::NoticeStream;
use crate::transport::r#async::BROADCAST_CHANNEL_CAPACITY;
#[derive(Default)]
#[must_use = "ClientBuilder does nothing until you call connect() or connect_with_notice_stream()"]
pub struct ClientBuilder {
state: BuilderState,
}
impl ClientBuilder {
pub fn address(mut self, addr: impl Into<String>) -> Self {
self.state.address = Some(addr.into());
self
}
pub fn client_id(mut self, id: i32) -> Self {
self.state.client_id = Some(id);
self
}
pub fn tcp_no_delay(mut self, enabled: bool) -> Self {
self.state.tcp_no_delay = enabled;
self
}
pub fn startup_callback(mut self, callback: impl Fn(StartupMessage) + Send + Sync + 'static) -> Self {
self.state.startup_callback = Some(Arc::new(callback));
self
}
pub async fn connect(self) -> Result<Client, Error> {
let (sender, _rx) = broadcast::channel::<Notice>(BROADCAST_CHANNEL_CAPACITY);
self.connect_with_sender(sender).await
}
pub async fn connect_with_notice_stream(self) -> Result<(Client, NoticeStream), Error> {
let (sender, receiver) = broadcast::channel::<Notice>(BROADCAST_CHANNEL_CAPACITY);
let stream = NoticeStream::new(receiver);
let client = self.connect_with_sender(sender).await?;
Ok((client, stream))
}
async fn connect_with_sender(self, sender: broadcast::Sender<Notice>) -> Result<Client, Error> {
let pieces = self.state.validate()?;
Client::connect_with_pieces(&pieces.address, pieces.client_id, pieces.tcp_no_delay, pieces.startup_callback, sender).await
}
}
}
#[cfg(test)]
#[path = "client_builder_tests.rs"]
mod tests;