use crate::client_wrappers::client_wrapper::ClientWrapper;
use crate::client_wrappers::connection_info::ConnectionInfo;
use crate::clients::client_builder::IggyClientBuilder;
use crate::http::http_client::HttpClient;
use crate::prelude::EncryptorKind;
use crate::prelude::IggyConsumerBuilder;
use crate::prelude::IggyError;
use crate::prelude::IggyProducerBuilder;
use crate::quic::quic_client::QuicClient;
use crate::tcp::tcp_client::TcpClient;
use crate::websocket::websocket_client::WebSocketClient;
use async_broadcast::Receiver;
use async_trait::async_trait;
use iggy_common::Consumer;
use iggy_common::locking::{IggyRwLock, IggyRwLockFn};
use iggy_common::{Client, SystemClient};
use iggy_common::{ConnectionStringUtils, DiagnosticEvent, Partitioner, TransportProtocol};
use std::fmt::Debug;
use std::sync::Arc;
use tokio::spawn;
use tokio::time::sleep;
use tracing::log::warn;
use tracing::{debug, error, info};
#[derive(Debug)]
#[allow(dead_code)]
pub struct IggyClient {
pub(crate) client: IggyRwLock<ClientWrapper>,
partitioner: Option<Arc<dyn Partitioner>>,
pub(crate) encryptor: Option<Arc<EncryptorKind>>,
}
impl Default for IggyClient {
fn default() -> Self {
IggyClient::new(ClientWrapper::Tcp(TcpClient::default()))
}
}
impl IggyClient {
pub fn builder() -> IggyClientBuilder {
IggyClientBuilder::new()
}
pub fn builder_from_connection_string(
connection_string: &str,
) -> Result<IggyClientBuilder, IggyError> {
IggyClientBuilder::from_connection_string(connection_string)
}
pub fn new(client: ClientWrapper) -> Self {
let client = IggyRwLock::new(client);
IggyClient {
client,
partitioner: None,
encryptor: None,
}
}
pub fn from_connection_string(connection_string: &str) -> Result<Self, IggyError> {
match ConnectionStringUtils::parse_protocol(connection_string)? {
TransportProtocol::Tcp => Ok(IggyClient::new(ClientWrapper::Tcp(
TcpClient::from_connection_string(connection_string)?,
))),
TransportProtocol::Quic => Ok(IggyClient::new(ClientWrapper::Quic(
QuicClient::from_connection_string(connection_string)?,
))),
TransportProtocol::Http => Ok(IggyClient::new(ClientWrapper::Http(
HttpClient::from_connection_string(connection_string)?,
))),
TransportProtocol::WebSocket => Ok(IggyClient::new(ClientWrapper::WebSocket(
WebSocketClient::from_connection_string(connection_string)?,
))),
}
}
pub fn create(
client: ClientWrapper,
partitioner: Option<Arc<dyn Partitioner>>,
encryptor: Option<Arc<EncryptorKind>>,
) -> Self {
if partitioner.is_some() {
info!("Partitioner is enabled.");
}
if encryptor.is_some() {
info!("Client-side encryption is enabled.");
}
let client = IggyRwLock::new(client);
IggyClient {
client,
partitioner,
encryptor,
}
}
pub fn client(&self) -> IggyRwLock<ClientWrapper> {
self.client.clone()
}
pub fn consumer(
&self,
name: &str,
stream: &str,
topic: &str,
partition: u32,
) -> Result<IggyConsumerBuilder, IggyError> {
Ok(IggyConsumerBuilder::new(
self.client.clone(),
name.to_owned(),
Consumer::new(name.try_into()?),
stream.try_into()?,
topic.try_into()?,
Some(partition),
self.encryptor.clone(),
None,
))
}
pub fn consumer_group(
&self,
name: &str,
stream: &str,
topic: &str,
) -> Result<IggyConsumerBuilder, IggyError> {
Ok(IggyConsumerBuilder::new(
self.client.clone(),
name.to_owned(),
Consumer::group(name.try_into()?),
stream.try_into()?,
topic.try_into()?,
None,
self.encryptor.clone(),
None,
))
}
pub fn producer(&self, stream: &str, topic: &str) -> Result<IggyProducerBuilder, IggyError> {
Ok(IggyProducerBuilder::new(
self.client.clone(),
stream.try_into()?,
stream.to_owned(),
topic.try_into()?,
topic.to_owned(),
self.encryptor.clone(),
None,
))
}
pub async fn get_connection_info(&self) -> ConnectionInfo {
self.client.read().await.get_connection_info().await
}
}
#[async_trait]
impl Client for IggyClient {
async fn connect(&self) -> Result<(), IggyError> {
let heartbeat_interval;
{
let client = self.client.read().await;
client.connect().await?;
heartbeat_interval = client.heartbeat_interval().await;
}
let client = self.client.clone();
spawn(async move {
loop {
debug!("Sending the heartbeat...");
if let Err(error) = client.read().await.ping().await {
error!("There was an error when sending a heartbeat. {error}");
if error == IggyError::ClientShutdown {
warn!("The client has been shut down - stopping the heartbeat.");
return;
}
} else {
debug!("Heartbeat was sent successfully.");
}
sleep(heartbeat_interval.get_duration()).await
}
});
Ok(())
}
async fn disconnect(&self) -> Result<(), IggyError> {
self.client.read().await.disconnect().await
}
async fn shutdown(&self) -> Result<(), IggyError> {
self.client.read().await.shutdown().await
}
async fn subscribe_events(&self) -> Receiver<DiagnosticEvent> {
self.client.read().await.subscribe_events().await
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn should_fail_with_empty_connection_string() {
let value = "";
let client = IggyClient::from_connection_string(value);
assert!(client.is_err());
}
#[test]
fn should_fail_without_username() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Tcp;
let server_address = "127.0.0.1";
let port = "1234";
let username = "";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_err());
}
#[test]
fn should_fail_without_password() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Tcp;
let server_address = "127.0.0.1";
let port = "1234";
let username = "user";
let password = "";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_err());
}
#[test]
fn should_fail_without_server_address() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Tcp;
let server_address = "";
let port = "1234";
let username = "user";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_err());
}
#[test]
fn should_fail_without_port() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Tcp;
let server_address = "127.0.0.1";
let port = "";
let username = "user";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_err());
}
#[test]
fn should_fail_with_invalid_prefix() {
let connection_string_prefix = "invalid+";
let protocol = TransportProtocol::Tcp;
let server_address = "127.0.0.1";
let port = "1234";
let username = "user";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_err());
}
#[test]
fn should_succeed_with_default_prefix() {
let default_connection_string_prefix = "iggy://";
let server_address = "127.0.0.1";
let port = "1234";
let username = "user";
let password = "secret";
let value = format!(
"{default_connection_string_prefix}{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
#[test]
fn should_succeed_with_tcp_protocol() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Tcp;
let server_address = "127.0.0.1";
let port = "1234";
let username = "user";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
#[test]
fn should_succeed_with_tcp_protocol_using_pat() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Tcp;
let server_address = "127.0.0.1";
let port = "1234";
let pat = "iggypat-1234567890abcdef";
let value = format!("{connection_string_prefix}{protocol}://{pat}@{server_address}:{port}");
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
#[tokio::test]
async fn should_succeed_with_quic_protocol() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Quic;
let server_address = "127.0.0.1";
let port = "1234";
let username = "user";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
#[tokio::test]
async fn should_succeed_with_quic_protocol_using_pat() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Quic;
let server_address = "127.0.0.1";
let port = "1234";
let pat = "iggypat-1234567890abcdef";
let value = format!("{connection_string_prefix}{protocol}://{pat}@{server_address}:{port}");
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
#[test]
fn should_succeed_with_http_protocol() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Http;
let server_address = "127.0.0.1";
let port = "1234";
let username = "user";
let password = "secret";
let value = format!(
"{connection_string_prefix}{protocol}://{username}:{password}@{server_address}:{port}"
);
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
#[test]
fn should_succeed_with_http_protocol_with_pat() {
let connection_string_prefix = "iggy+";
let protocol = TransportProtocol::Http;
let server_address = "127.0.0.1";
let port = "1234";
let pat = "iggypat-1234567890abcdef";
let value = format!("{connection_string_prefix}{protocol}://{pat}@{server_address}:{port}");
let client = IggyClient::from_connection_string(&value);
assert!(client.is_ok());
}
}