use std::convert::TryFrom;
use std::convert::TryInto;
use std::fmt::Display;
use std::sync::Arc;
use fluvio_sc_schema::TryEncodableFrom;
use tracing::{debug, instrument};
use anyhow::Result;
use fluvio_protocol::Decoder;
use fluvio_protocol::Encoder;
use fluvio_sc_schema::AdminSpec;
use fluvio_sc_schema::objects::Metadata;
use fluvio_sc_schema::objects::ObjectApiWatchRequest;
use fluvio_socket::AsyncResponse;
use fluvio_socket::SharedMultiplexerSocket;
use crate::metadata::topic::TopicSpec;
use crate::metadata::spu::SpuSpec;
use crate::metadata::partition::PartitionSpec;
use super::CacheMetadataStoreObject;
use super::controller::{MetadataSyncController, SimpleEvent};
use super::StoreContext;
#[derive(Clone)]
pub struct MetadataStores {
shutdown: Arc<SimpleEvent>,
spus: StoreContext<SpuSpec>,
partitions: StoreContext<PartitionSpec>,
topics: StoreContext<TopicSpec>,
socket: SharedMultiplexerSocket,
watch_version: i16,
}
impl MetadataStores {
#[instrument(skip(socket))]
pub(crate) async fn start(socket: SharedMultiplexerSocket, watch_version: i16) -> Result<Self> {
debug!(watch_version, "starting metadata store");
let store = Self {
shutdown: SimpleEvent::shared(),
spus: StoreContext::new(),
partitions: StoreContext::new(),
topics: StoreContext::new(),
socket,
watch_version,
};
store.start_watch_for_spu().await?;
store.start_watch_for_partition().await?;
store.start_watch_for_topic().await?;
Ok(store)
}
pub(crate) fn spus(&self) -> &StoreContext<SpuSpec> {
&self.spus
}
pub(crate) fn partitions(&self) -> &StoreContext<PartitionSpec> {
&self.partitions
}
pub(crate) fn topics(&self) -> &StoreContext<TopicSpec> {
&self.topics
}
pub(crate) fn shutdown(&mut self) {
self.shutdown.notify();
}
#[instrument(skip(self))]
pub(crate) async fn start_watch_for_spu(&self) -> Result<()> {
self.start_watch::<SpuSpec>(self.spus.clone()).await?;
Ok(())
}
#[instrument(skip(self))]
pub(crate) async fn start_watch_for_partition(&self) -> Result<()> {
self.start_watch::<PartitionSpec>(self.partitions.clone())
.await?;
Ok(())
}
#[instrument(skip(self))]
pub(crate) async fn start_watch_for_topic(&self) -> Result<()> {
self.start_watch::<TopicSpec>(self.topics.clone()).await?;
Ok(())
}
#[instrument(skip(self, store))]
async fn start_watch<S>(&self, store: StoreContext<S>) -> Result<()>
where
S: AdminSpec + 'static + Sync + Send,
AsyncResponse<ObjectApiWatchRequest>: Send,
S: Encoder + Decoder + Send + Sync,
S::Status: Sync + Send + Encoder + Decoder,
S::IndexKey: Display + Sync + Send,
CacheMetadataStoreObject<S>: TryFrom<Metadata<S>>,
<Metadata<S> as TryInto<CacheMetadataStoreObject<S>>>::Error: Display,
{
use fluvio_protocol::api::RequestMessage;
use fluvio_sc_schema::objects::WatchRequest;
let watch_request: WatchRequest<S> = WatchRequest::default();
let watch_req = ObjectApiWatchRequest::try_encode_from(watch_request, self.watch_version)?;
let mut req_msg = RequestMessage::new_request(watch_req);
req_msg.get_mut_header().set_api_version(self.watch_version);
debug!(watch_version = self.watch_version, obj = %S::LABEL, "create metadata stream");
let async_response = self.socket.create_stream(req_msg, 10).await?;
MetadataSyncController::<S>::start(store, async_response, self.shutdown.clone());
Ok(())
}
}