use std::convert::{TryFrom, TryInto};
use std::fmt::Display;
use tracing::debug;
use dataplane::core::Encoder;
use dataplane::core::Decoder;
use fluvio_sc_schema::objects::{Metadata, AllCreatableSpec};
use fluvio_sc_schema::AdminRequest;
use fluvio_socket::FlvSocketError;
use fluvio_socket::AllMultiplexerSocket;
use fluvio_future::native_tls::AllDomainConnector;
use crate::client::{ClientConfig, VersionedSerialSocket, SerialFrame};
use crate::{FluvioError, FluvioConfig};
use crate::metadata::objects::{ListResponse, ListSpec, DeleteSpec, CreateRequest};
use crate::config::ConfigFile;
pub struct FluvioAdmin(VersionedSerialSocket);
impl FluvioAdmin {
pub(crate) fn new(client: VersionedSerialSocket) -> Self {
Self(client)
}
pub async fn connect() -> Result<Self, FluvioError> {
let config_file = ConfigFile::load_default_or_new()?;
let cluster_config = config_file.config().current_cluster()?;
Self::connect_with_config(cluster_config).await
}
pub async fn connect_with_config(config: &FluvioConfig) -> Result<Self, FluvioError> {
let connector = AllDomainConnector::try_from(config.tls.clone())?;
let config = ClientConfig::new(&config.endpoint, connector);
let inner_client = config.connect().await?;
debug!("connected to cluster at: {}", inner_client.config().addr());
let (socket, config, versions) = inner_client.split();
let socket = AllMultiplexerSocket::shared(socket);
let versioned_socket = VersionedSerialSocket::new(socket, config, versions);
Ok(Self(versioned_socket))
}
async fn send_receive<R>(&mut self, request: R) -> Result<R::Response, FlvSocketError>
where
R: AdminRequest + Send + Sync,
{
self.0.send_receive(request).await
}
pub async fn create<S>(
&mut self,
name: String,
dry_run: bool,
spec: S,
) -> Result<(), FluvioError>
where
S: Into<AllCreatableSpec>,
{
let create_request = CreateRequest {
name,
dry_run,
spec: spec.into(),
};
self.send_receive(create_request).await?.as_result()?;
Ok(())
}
pub async fn delete<S, K>(&mut self, key: K) -> Result<(), FluvioError>
where
S: DeleteSpec,
K: Into<S::DeleteKey>,
{
let delete_request = S::into_request(key);
self.send_receive(delete_request).await?.as_result()?;
Ok(())
}
pub async fn list<S, F>(&mut self, filters: F) -> Result<Vec<Metadata<S>>, FluvioError>
where
S: ListSpec + Encoder + Decoder,
S::Status: Encoder + Decoder,
F: Into<Vec<S::Filter>>,
ListResponse: TryInto<Vec<Metadata<S>>>,
<ListResponse as TryInto<Vec<Metadata<S>>>>::Error: Display,
{
use std::io::Error;
use std::io::ErrorKind;
let list_request = S::into_list_request(filters.into());
let response = self.send_receive(list_request).await?;
response
.try_into()
.map_err(|err| Error::new(ErrorKind::Other, format!("can't convert: {}", err)).into())
}
}