use crate::client_wrappers::client_wrapper::ClientWrapper;
use async_trait::async_trait;
use iggy_common::MessageClient;
use iggy_common::{
Consumer, Identifier, IggyError, IggyMessage, Partitioning, PolledMessages, PollingStrategy,
};
#[async_trait]
impl MessageClient for ClientWrapper {
async fn poll_messages(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partition_id: Option<u32>,
consumer: &Consumer,
strategy: &PollingStrategy,
count: u32,
auto_commit: bool,
) -> Result<PolledMessages, IggyError> {
match self {
ClientWrapper::Iggy(client) => {
client
.poll_messages(
stream_id,
topic_id,
partition_id,
consumer,
strategy,
count,
auto_commit,
)
.await
}
ClientWrapper::Http(client) => {
client
.poll_messages(
stream_id,
topic_id,
partition_id,
consumer,
strategy,
count,
auto_commit,
)
.await
}
ClientWrapper::Tcp(client) => {
client
.poll_messages(
stream_id,
topic_id,
partition_id,
consumer,
strategy,
count,
auto_commit,
)
.await
}
ClientWrapper::Quic(client) => {
client
.poll_messages(
stream_id,
topic_id,
partition_id,
consumer,
strategy,
count,
auto_commit,
)
.await
}
ClientWrapper::WebSocket(client) => {
client
.poll_messages(
stream_id,
topic_id,
partition_id,
consumer,
strategy,
count,
auto_commit,
)
.await
}
}
}
async fn send_messages(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partitioning: &Partitioning,
messages: &mut [IggyMessage],
) -> Result<(), IggyError> {
match self {
ClientWrapper::Iggy(client) => {
client
.send_messages(stream_id, topic_id, partitioning, messages)
.await
}
ClientWrapper::Http(client) => {
client
.send_messages(stream_id, topic_id, partitioning, messages)
.await
}
ClientWrapper::Tcp(client) => {
client
.send_messages(stream_id, topic_id, partitioning, messages)
.await
}
ClientWrapper::Quic(client) => {
client
.send_messages(stream_id, topic_id, partitioning, messages)
.await
}
ClientWrapper::WebSocket(client) => {
client
.send_messages(stream_id, topic_id, partitioning, messages)
.await
}
}
}
async fn flush_unsaved_buffer(
&self,
stream_id: &Identifier,
topic_id: &Identifier,
partitioning_id: u32,
fsync: bool,
) -> Result<(), IggyError> {
match self {
ClientWrapper::Iggy(client) => {
client
.flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync)
.await
}
ClientWrapper::Http(client) => {
client
.flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync)
.await
}
ClientWrapper::Tcp(client) => {
client
.flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync)
.await
}
ClientWrapper::Quic(client) => {
client
.flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync)
.await
}
ClientWrapper::WebSocket(client) => {
client
.flush_unsaved_buffer(stream_id, topic_id, partitioning_id, fsync)
.await
}
}
}
}