use std::convert::TryFrom;
use std::fmt::Debug;
use std::io::Error as IoError;
use futures_util::{Stream, StreamExt};
use tracing::{debug, trace, instrument};
use anyhow::{Result, anyhow};
use fluvio_sc_schema::objects::ObjectApiUpdateRequest;
use fluvio_sc_schema::objects::UpdateRequest;
use fluvio_sc_schema::UpdatableAdminSpec;
use fluvio_protocol::{Decoder, Encoder};
use fluvio_protocol::api::{Request, RequestMessage};
use fluvio_future::net::DomainConnector;
use fluvio_sc_schema::objects::{
DeleteRequest, ObjectApiCreateRequest, ObjectApiDeleteRequest, ObjectApiListRequest,
ObjectApiWatchRequest, Metadata, ListFilter, WatchRequest, WatchResponse, CreateRequest,
CommonCreateRequest,
};
use fluvio_sc_schema::{AdminSpec, DeletableAdminSpec, CreatableAdminSpec, TryEncodableFrom};
use fluvio_socket::{ClientConfig, VersionedSerialSocket, SerialFrame, MultiplexerSocket};
use crate::FluvioClusterConfig;
use crate::config::ConfigFile;
use crate::error::anyhow_version_error;
use crate::metadata::objects::{ListResponse, ListRequest};
use crate::sync::MetadataStores;
pub struct FluvioAdmin {
socket: VersionedSerialSocket,
#[allow(dead_code)]
metadata: MetadataStores,
}
impl FluvioAdmin {
pub(crate) fn new(socket: VersionedSerialSocket, metadata: MetadataStores) -> Self {
Self { socket, metadata }
}
#[instrument]
pub async fn connect() -> Result<Self> {
let config_file = ConfigFile::load_default_or_new()?;
let cluster_config = config_file.config().current_cluster()?;
Self::connect_with_config(cluster_config).await
}
#[instrument(skip(config))]
pub async fn connect_with_config(config: &FluvioClusterConfig) -> Result<Self> {
let connector = DomainConnector::try_from(config.tls.clone())?;
let client_config =
ClientConfig::new(&config.endpoint, connector, config.use_spu_local_address);
let inner_client = client_config.connect().await?;
debug!(addr = %inner_client.config().addr(), "connected to cluster");
let (socket, config, versions) = inner_client.split();
if let Some(watch_version) = versions.lookup_version::<ObjectApiWatchRequest>() {
let socket = MultiplexerSocket::shared(socket);
let metadata = MetadataStores::start(socket.clone(), watch_version).await?;
let versioned_socket = VersionedSerialSocket::new(socket, config, versions);
Ok(Self {
socket: versioned_socket,
metadata,
})
} else {
let platform_version = versions.platform_version().to_string();
Err(anyhow_version_error(&platform_version))
}
}
#[instrument(skip(self, request))]
async fn send_receive_admin<R, I>(&self, request: I) -> Result<R::Response>
where
R: Request + Send + Sync,
R: TryEncodableFrom<I>,
{
let version = self
.socket
.lookup_version::<R>()
.ok_or(anyhow!("no version found for: {}", R::API_KEY))?;
let request = R::try_encode_from(request, version)?;
let req_msg = self.socket.new_request(request, Some(version));
self.socket
.send_and_receive(req_msg)
.await
.map_err(|err| err.into())
}
#[instrument(skip(self, name, dry_run, spec))]
pub async fn create<S>(&self, name: String, dry_run: bool, spec: S) -> Result<()>
where
S: CreatableAdminSpec + Sync + Send,
{
let common_request = CommonCreateRequest {
name,
dry_run,
..Default::default()
};
self.create_with_config(common_request, spec).await
}
#[instrument(skip(self, config, spec))]
pub async fn create_with_config<S>(&self, config: CommonCreateRequest, spec: S) -> Result<()>
where
S: CreatableAdminSpec + Sync + Send,
{
let create_request = CreateRequest::new(config, spec);
debug!("sending create request: {:#?}", create_request);
self.send_receive_admin::<ObjectApiCreateRequest, _>(create_request)
.await?
.as_result()?;
Ok(())
}
#[instrument(skip(self, key))]
pub async fn delete<S>(&self, key: impl Into<S::DeleteKey>) -> Result<()>
where
S: DeletableAdminSpec + Sync + Send,
{
let delete_request: DeleteRequest<S> = DeleteRequest::new(key.into());
debug!("sending delete request: {:#?}", delete_request);
self.send_receive_admin::<ObjectApiDeleteRequest, _>(delete_request)
.await?
.as_result()?;
Ok(())
}
#[instrument(skip(self, key))]
pub async fn force_delete<S>(&self, key: impl Into<S::DeleteKey>) -> Result<()>
where
S: DeletableAdminSpec + Sync + Send,
{
let delete_request: DeleteRequest<S> = DeleteRequest::with(key.into(), true);
debug!("sending force delete request: {:#?}", delete_request);
self.send_receive_admin::<ObjectApiDeleteRequest, _>(delete_request)
.await?
.as_result()?;
Ok(())
}
#[instrument(skip(self, key))]
pub async fn update<S>(
&self,
key: impl Into<S::UpdateKey>,
action: S::UpdateAction,
) -> Result<()>
where
S: UpdatableAdminSpec + Sync + Send,
{
let update_request: UpdateRequest<S> = UpdateRequest::new(key.into(), action);
debug!("sending update request: {:#?}", update_request);
self.send_receive_admin::<ObjectApiUpdateRequest, _>(update_request)
.await?
.as_result()?;
Ok(())
}
#[instrument(skip(self))]
pub async fn all<S>(&self) -> Result<Vec<Metadata<S>>>
where
S: AdminSpec,
S::Status: Encoder + Decoder + Debug,
{
self.list_with_params::<S, String>(vec![], false).await
}
#[instrument(skip(self, filters))]
pub async fn list<S, F>(&self, filters: Vec<F>) -> Result<Vec<Metadata<S>>>
where
S: AdminSpec,
ListFilter: From<F>,
S::Status: Encoder + Decoder + Debug,
{
self.list_with_params(filters, false).await
}
#[instrument(skip(self, filters))]
pub async fn list_with_params<S, F>(
&self,
filters: Vec<F>,
summary: bool,
) -> Result<Vec<Metadata<S>>>
where
S: AdminSpec,
ListFilter: From<F>,
S::Status: Encoder + Decoder + Debug,
{
let filter_list: Vec<ListFilter> = filters.into_iter().map(Into::into).collect();
let list_request: ListRequest<S> = ListRequest::new(filter_list, summary);
self.list_with_config(list_request).await
}
#[instrument(skip(self, config))]
pub async fn list_with_config<S, F>(&self, config: ListRequest<S>) -> Result<Vec<Metadata<S>>>
where
S: AdminSpec,
ListFilter: From<F>,
S::Status: Encoder + Decoder + Debug,
{
let response = self
.send_receive_admin::<ObjectApiListRequest, _>(config)
.await?;
trace!("list response: {:#?}", response);
response
.downcast()?
.ok_or(anyhow!("downcast error: {s}", s = S::LABEL))
.map(|out: ListResponse<S>| out.inner())
}
#[instrument(skip(self))]
pub async fn watch<S>(
&self,
) -> Result<impl Stream<Item = Result<WatchResponse<S>, IoError>> + Unpin>
where
S: AdminSpec,
S::Status: Encoder + Decoder,
{
let watch_request: WatchRequest<S> = WatchRequest::summary();
let version = self
.socket
.lookup_version::<ObjectApiWatchRequest>()
.ok_or(anyhow!(
"no version found watch request {}",
ObjectApiWatchRequest::API_KEY
))?;
let watch_req = ObjectApiWatchRequest::try_encode_from(watch_request, version)?;
let req_msg = RequestMessage::new_request(watch_req);
debug!(api_version = req_msg.header.api_version(), obj = %S::LABEL, "create watch stream");
let inner_socket = self.socket.new_socket();
let stream = inner_socket.create_stream(req_msg, 10).await?;
let mapped_stream = stream.map(|respons_result| match respons_result {
Ok(response) => {
let watch_response = response
.downcast()
.map_err(|err| IoError::other(format!("downcast error: {err:#?}")))?;
watch_response.ok_or(IoError::other(format!(
"cannot decoded as {s}",
s = S::LABEL
)))
}
Err(err) => Err(IoError::other(format!("socket error {err}"))),
});
Ok(Box::pin(mapped_stream))
}
}
#[cfg(feature = "unstable")]
mod unstable {
use super::*;
use fluvio_stream_dispatcher::metadata::local::LocalMetadataItem;
use futures_util::Stream;
use crate::metadata::topic::TopicSpec;
use crate::metadata::partition::PartitionSpec;
use crate::metadata::spu::SpuSpec;
use crate::metadata::store::MetadataChanges;
impl FluvioAdmin {
pub fn watch_topics(
&self,
) -> impl Stream<Item = MetadataChanges<TopicSpec, LocalMetadataItem>> {
self.metadata.topics().watch()
}
pub fn watch_partitions(
&self,
) -> impl Stream<Item = MetadataChanges<PartitionSpec, LocalMetadataItem>> {
self.metadata.partitions().watch()
}
pub fn watch_spus(
&self,
) -> impl Stream<Item = MetadataChanges<SpuSpec, LocalMetadataItem>> {
self.metadata.spus().watch()
}
}
}