use crate::batch::BatchAppendClient;
use crate::grpc::{ClientSettings, GrpcClient};
use crate::options::batch_append::BatchAppendOptions;
use crate::options::persistent_subscription::PersistentSubscriptionOptions;
use crate::options::read_all::ReadAllOptions;
use crate::options::read_stream::ReadStreamOptions;
use crate::options::subscribe_to_stream::SubscribeToStreamOptions;
use crate::server_features::ServerInfo;
use crate::{
commands, DeletePersistentSubscriptionOptions, DeleteStreamOptions,
GetPersistentSubscriptionInfoOptions, ListPersistentSubscriptionsOptions, MetadataStreamName,
PersistentSubscription, PersistentSubscriptionInfo, PersistentSubscriptionToAllOptions,
Position, ReadStream, ReplayParkedMessagesOptions, RestartPersistentSubscriptionSubsystem,
RevisionOrPosition, StreamMetadata, StreamMetadataResult, StreamName, SubscribeToAllOptions,
SubscribeToPersistentSubscriptionOptions, Subscription, TombstoneStreamOptions,
VersionedMetadata, WriteResult,
};
use crate::{
options::append_to_stream::{AppendToStreamOptions, ToEvents},
EventData,
};
#[derive(Clone)]
pub struct Client {
pub(crate) http_client: reqwest::Client,
pub(crate) client: GrpcClient,
}
impl Client {
pub fn new(settings: ClientSettings) -> crate::Result<Self> {
Client::with_runtime_handle(tokio::runtime::Handle::current(), settings)
}
pub fn with_runtime_handle(
handle: tokio::runtime::Handle,
settings: ClientSettings,
) -> crate::Result<Self> {
let client = GrpcClient::create(handle, settings.clone());
let http_client = reqwest::Client::builder()
.danger_accept_invalid_certs(!settings.is_tls_certificate_verification_enabled())
.https_only(settings.is_secure_mode_enabled())
.build()
.map_err(|e| crate::Error::InitializationError(e.to_string()))?;
Ok(Client {
http_client,
client,
})
}
pub fn settings(&self) -> &ClientSettings {
self.client.connection_settings()
}
pub async fn server_info(&self) -> crate::Result<ServerInfo> {
let handle = self.client.current_selected_node().await?;
Ok(handle.server_info())
}
pub async fn append_to_stream<Events>(
&self,
stream_name: impl StreamName,
options: &AppendToStreamOptions,
events: Events,
) -> crate::Result<WriteResult>
where
Events: ToEvents,
{
commands::append_to_stream(&self.client, stream_name, options, events.into_events()).await
}
pub async fn set_stream_metadata(
&self,
name: impl MetadataStreamName,
options: &AppendToStreamOptions,
metadata: &StreamMetadata,
) -> crate::Result<WriteResult> {
let event = EventData::json("$metadata", metadata)
.map_err(|e| crate::Error::InternalParsingError(e.to_string()))?;
self.append_to_stream(name.into_metadata_stream_name(), options, event)
.await
}
pub async fn batch_append(
&self,
options: &BatchAppendOptions,
) -> crate::Result<BatchAppendClient> {
commands::batch_append(&self.client, options).await
}
pub async fn read_stream(
&self,
stream_name: impl StreamName,
options: &ReadStreamOptions,
) -> crate::Result<ReadStream> {
commands::read_stream(
self.client.clone(),
options,
stream_name,
options.max_count as u64,
)
.await
}
pub async fn read_all(&self, options: &ReadAllOptions) -> crate::Result<ReadStream> {
commands::read_all(self.client.clone(), options, options.max_count as u64).await
}
pub async fn get_stream_metadata(
&self,
name: impl MetadataStreamName,
options: &ReadStreamOptions,
) -> crate::Result<StreamMetadataResult> {
let mut stream = self
.read_stream(name.into_metadata_stream_name(), options)
.await?;
match stream.next().await {
Ok(event) => {
let event = event.expect("to be defined");
let metadata = event
.get_original_event()
.as_json::<StreamMetadata>()
.map_err(|e| crate::Error::InternalParsingError(e.to_string()))?;
let metadata = VersionedMetadata {
stream: event.get_original_stream_id().to_string(),
version: event.get_original_event().revision,
metadata,
};
Ok(StreamMetadataResult::Success(Box::new(metadata)))
}
Err(e) => match e {
crate::Error::ResourceNotFound => Ok(StreamMetadataResult::NotFound),
crate::Error::ResourceDeleted => Ok(StreamMetadataResult::Deleted),
other => Err(other),
},
}
}
pub async fn delete_stream(
&self,
stream_name: impl StreamName,
options: &DeleteStreamOptions,
) -> crate::Result<Option<Position>> {
commands::delete_stream(&self.client, stream_name, options).await
}
pub async fn tombstone_stream(
&self,
stream_name: impl StreamName,
options: &TombstoneStreamOptions,
) -> crate::Result<Option<Position>> {
commands::tombstone_stream(&self.client, stream_name, options).await
}
pub async fn subscribe_to_stream(
&self,
stream_name: impl StreamName,
options: &SubscribeToStreamOptions,
) -> Subscription {
commands::subscribe_to_stream(self.client.clone(), stream_name, options)
}
pub async fn subscribe_to_all(&self, options: &SubscribeToAllOptions) -> Subscription {
commands::subscribe_to_all(self.client.clone(), options)
}
pub async fn create_persistent_subscription(
&self,
stream_name: impl StreamName,
group_name: impl AsRef<str>,
options: &PersistentSubscriptionOptions,
) -> crate::Result<()> {
commands::create_persistent_subscription(
&self.client,
stream_name,
group_name.as_ref(),
options,
)
.await
}
pub async fn create_persistent_subscription_to_all(
&self,
group_name: impl AsRef<str>,
options: &PersistentSubscriptionToAllOptions,
) -> crate::Result<()> {
commands::create_persistent_subscription(&self.client, "", group_name.as_ref(), options)
.await
}
pub async fn update_persistent_subscription(
&self,
stream_name: impl StreamName,
group_name: impl AsRef<str>,
options: &PersistentSubscriptionOptions,
) -> crate::Result<()> {
commands::update_persistent_subscription(
&self.client,
stream_name,
group_name.as_ref(),
options,
)
.await
}
pub async fn update_persistent_subscription_to_all(
&self,
group_name: impl AsRef<str>,
options: &PersistentSubscriptionToAllOptions,
) -> crate::Result<()> {
commands::update_persistent_subscription(&self.client, "", group_name.as_ref(), options)
.await
}
pub async fn delete_persistent_subscription(
&self,
stream_name: impl StreamName,
group_name: impl AsRef<str>,
options: &DeletePersistentSubscriptionOptions,
) -> crate::Result<()> {
commands::delete_persistent_subscription(
&self.client,
stream_name,
group_name.as_ref(),
options,
false,
)
.await
}
pub async fn delete_persistent_subscription_to_all(
&self,
group_name: impl AsRef<str>,
options: &DeletePersistentSubscriptionOptions,
) -> crate::Result<()> {
commands::delete_persistent_subscription(
&self.client,
"",
group_name.as_ref(),
options,
true,
)
.await
}
pub async fn subscribe_to_persistent_subscription(
&self,
stream_name: impl StreamName,
group_name: impl AsRef<str>,
options: &SubscribeToPersistentSubscriptionOptions,
) -> crate::Result<PersistentSubscription> {
commands::subscribe_to_persistent_subscription(
&self.client,
stream_name,
group_name.as_ref(),
options,
false,
)
.await
}
pub async fn subscribe_to_persistent_subscription_to_all(
&self,
group_name: impl AsRef<str>,
options: &SubscribeToPersistentSubscriptionOptions,
) -> crate::Result<PersistentSubscription> {
commands::subscribe_to_persistent_subscription(
&self.client,
"",
group_name.as_ref(),
options,
true,
)
.await
}
pub async fn replay_parked_messages(
&self,
stream_name: impl AsRef<str>,
group_name: impl AsRef<str>,
options: &ReplayParkedMessagesOptions,
) -> crate::Result<()> {
commands::replay_parked_messages(
&self.client,
&self.http_client,
commands::RegularStream(stream_name.as_ref().to_string()),
group_name,
options,
)
.await
}
pub async fn replay_parked_messages_to_all(
&self,
group_name: impl AsRef<str>,
options: &ReplayParkedMessagesOptions,
) -> crate::Result<()> {
commands::replay_parked_messages(
&self.client,
&self.http_client,
commands::AllStream,
group_name,
options,
)
.await
}
pub async fn list_all_persistent_subscriptions(
&self,
options: &ListPersistentSubscriptionsOptions,
) -> crate::Result<Vec<PersistentSubscriptionInfo<RevisionOrPosition>>> {
commands::list_all_persistent_subscriptions(&self.client, &self.http_client, options).await
}
pub async fn list_persistent_subscriptions_for_stream(
&self,
stream_name: impl AsRef<str>,
options: &ListPersistentSubscriptionsOptions,
) -> crate::Result<Vec<PersistentSubscriptionInfo<u64>>> {
commands::list_persistent_subscriptions_for_stream(
&self.client,
&self.http_client,
commands::RegularStream(stream_name.as_ref().to_string()),
options,
)
.await
}
pub async fn list_persistent_subscriptions_to_all(
&self,
options: &ListPersistentSubscriptionsOptions,
) -> crate::Result<Vec<PersistentSubscriptionInfo<Position>>> {
commands::list_persistent_subscriptions_for_stream(
&self.client,
&self.http_client,
commands::AllStream,
options,
)
.await
}
pub async fn get_persistent_subscription_info(
&self,
stream_name: impl AsRef<str>,
group_name: impl AsRef<str>,
options: &GetPersistentSubscriptionInfoOptions,
) -> crate::Result<PersistentSubscriptionInfo<u64>> {
commands::get_persistent_subscription_info(
&self.client,
&self.http_client,
commands::RegularStream(stream_name.as_ref().to_string()),
group_name,
options,
)
.await
}
pub async fn get_persistent_subscription_info_to_all(
&self,
group_name: impl AsRef<str>,
options: &GetPersistentSubscriptionInfoOptions,
) -> crate::Result<PersistentSubscriptionInfo<Position>> {
commands::get_persistent_subscription_info(
&self.client,
&self.http_client,
commands::AllStream,
group_name,
options,
)
.await
}
pub async fn restart_persistent_subscription_subsystem(
&self,
options: &RestartPersistentSubscriptionSubsystem,
) -> crate::Result<()> {
commands::restart_persistent_subscription_subsystem(
&self.client,
&self.http_client,
options,
)
.await
}
}