use std::{collections::HashMap, future::Future, marker::PhantomData, pin::Pin};
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::mpsc::Receiver;
use tower::discover::Change;
use crate::{
params::{extension_param::ExtensionName, registry_param::RegistryUrl},
url::UrlParam,
StdError, Url,
};
use proxy::RegistryProxy;
use crate::extension::{
Extension, ExtensionFactories, ExtensionMetaInfo, ExtensionType, LoadExtensionPromise,
};
pub fn to_extension_url(registry_url: Url) -> Url {
let mut registry_extension_loader_url: Url = "extension://0.0.0.0".parse().unwrap();
let protocol = registry_url.protocol();
registry_extension_loader_url.add_query_param(ExtensionType::Registry);
registry_extension_loader_url.add_query_param(ExtensionName::new(protocol.to_string()));
registry_extension_loader_url.add_query_param(RegistryUrl::new(registry_url));
registry_extension_loader_url
}
pub type ServiceChange = Change<String, ()>;
pub type DiscoverStream = Receiver<Result<ServiceChange, StdError>>;
#[async_trait]
pub trait Registry {
async fn register(&self, url: Url) -> Result<(), StdError>;
async fn unregister(&self, url: Url) -> Result<(), StdError>;
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError>;
async fn unsubscribe(&self, url: Url) -> Result<(), StdError>;
fn url(&self) -> &Url;
}
pub struct RegistryExtension<T>(PhantomData<T>)
where
T: Registry + Send + Sync + 'static;
impl<T> ExtensionMetaInfo for RegistryExtension<T>
where
T: Registry + Send + Sync + 'static,
T: Extension<Target = Box<dyn Registry + Send + Sync + 'static>>,
{
fn name() -> String {
T::name()
}
fn extension_type() -> ExtensionType {
ExtensionType::Registry
}
fn extension_factory() -> ExtensionFactories {
ExtensionFactories::RegistryExtensionFactory(RegistryExtensionFactory::new(
<T as Extension>::create,
))
}
}
#[derive(Default)]
pub(super) struct RegistryExtensionLoader {
factories: HashMap<String, RegistryExtensionFactory>,
}
impl RegistryExtensionLoader {
pub(crate) fn register(&mut self, extension_name: String, factory: RegistryExtensionFactory) {
self.factories.insert(extension_name, factory);
}
pub(crate) fn remove(&mut self, extension_name: String) {
self.factories.remove(&extension_name);
}
pub(crate) fn load(
&mut self,
url: Url,
) -> Result<LoadExtensionPromise<RegistryProxy>, StdError> {
let extension_name = url.query::<ExtensionName>().unwrap();
let extension_name = extension_name.value();
let factory = self.factories.get_mut(&extension_name).ok_or_else(|| {
RegistryExtensionLoaderError::new(format!(
"registry extension loader error: extension name {} not found",
extension_name
))
})?;
factory.create(url)
}
}
type RegistryConstructor = fn(
Url,
) -> Pin<
Box<dyn Future<Output = Result<Box<dyn Registry + Send + Sync + 'static>, StdError>> + Send>,
>;
pub(crate) struct RegistryExtensionFactory {
constructor: RegistryConstructor,
instances: HashMap<String, LoadExtensionPromise<RegistryProxy>>,
}
impl RegistryExtensionFactory {
pub(super) fn new(constructor: RegistryConstructor) -> Self {
Self {
constructor,
instances: HashMap::new(),
}
}
}
impl RegistryExtensionFactory {
pub(super) fn create(
&mut self,
url: Url,
) -> Result<LoadExtensionPromise<RegistryProxy>, StdError> {
let registry_url = url.query::<RegistryUrl>().unwrap();
let registry_url = registry_url.value();
let url_str = registry_url.as_str().to_string();
match self.instances.get(&url_str) {
Some(proxy) => {
let proxy = proxy.clone();
Ok(proxy)
}
None => {
let constructor = self.constructor;
let creator = move |url: Url| {
let registry = constructor(url);
Box::pin(async move {
let registry = registry.await?;
let proxy = <RegistryProxy as From<Box<dyn Registry + Send + Sync>>>::from(
registry,
);
Ok(proxy)
})
as Pin<
Box<
dyn Future<Output = Result<RegistryProxy, StdError>>
+ Send
+ 'static,
>,
>
};
let promise = LoadExtensionPromise::new(Box::new(creator), url);
self.instances.insert(url_str, promise.clone());
Ok(promise)
}
}
}
}
#[derive(Error, Debug)]
#[error("{0}")]
pub(crate) struct RegistryExtensionLoaderError(String);
impl RegistryExtensionLoaderError {
pub(crate) fn new(msg: String) -> Self {
RegistryExtensionLoaderError(msg)
}
}
pub mod proxy {
use async_trait::async_trait;
use thiserror::Error;
use tokio::sync::oneshot;
use crate::{logger::tracing::error, StdError, Url};
use crate::extension::registry_extension::{DiscoverStream, Registry};
pub(super) enum RegistryOpt {
Register(Url, oneshot::Sender<Result<(), StdError>>),
Unregister(Url, oneshot::Sender<Result<(), StdError>>),
Subscribe(Url, oneshot::Sender<Result<DiscoverStream, StdError>>),
UnSubscribe(Url, oneshot::Sender<Result<(), StdError>>),
}
#[derive(Clone)]
pub struct RegistryProxy {
sender: tokio::sync::mpsc::Sender<RegistryOpt>,
url: Url,
}
#[async_trait]
impl Registry for RegistryProxy {
async fn register(&self, url: Url) -> Result<(), StdError> {
let (tx, rx) = oneshot::channel();
match self
.sender
.send(RegistryOpt::Register(url.clone(), tx))
.await
{
Ok(_) => match rx.await {
Ok(result) => result,
Err(_) => {
error!(
"registry proxy error: receive register response failed, url: {}",
url
);
return Err(
RegistryProxyError::new("receive register response failed").into()
);
}
},
Err(_) => {
error!(
"registry proxy error: send register request failed, url: {}",
url
);
return Err(RegistryProxyError::new("send register opt failed").into());
}
}
}
async fn unregister(&self, url: Url) -> Result<(), StdError> {
let (tx, rx) = oneshot::channel();
match self
.sender
.send(RegistryOpt::Unregister(url.clone(), tx))
.await
{
Ok(_) => match rx.await {
Ok(result) => result,
Err(_) => {
error!(
"registry proxy error: receive unregister response failed, url: {}",
url
);
return Err(
RegistryProxyError::new("receive unregister response failed").into(),
);
}
},
Err(_) => {
error!(
"registry proxy error: send unregister request failed, url: {}",
url
);
return Err(RegistryProxyError::new("send unregister opt failed").into());
}
}
}
async fn subscribe(&self, url: Url) -> Result<DiscoverStream, StdError> {
let (tx, rx) = oneshot::channel();
match self
.sender
.send(RegistryOpt::Subscribe(url.clone(), tx))
.await
{
Ok(_) => match rx.await {
Ok(result) => result,
Err(_) => {
error!(
"registry proxy error: receive subscribe response failed, url: {}",
url
);
return Err(
RegistryProxyError::new("receive subscribe response failed").into()
);
}
},
Err(_) => {
error!(
"registry proxy error: send subscribe request failed, url: {}",
url
);
return Err(RegistryProxyError::new("send subscribe opt failed").into());
}
}
}
async fn unsubscribe(&self, url: Url) -> Result<(), StdError> {
let (tx, rx) = oneshot::channel();
match self
.sender
.send(RegistryOpt::UnSubscribe(url.clone(), tx))
.await
{
Ok(_) => {
match rx.await {
Ok(result) => result,
Err(_) => {
error!("registry proxy error: receive unsubscribe response failed, url: {}", url);
return Err(RegistryProxyError::new(
"receive unsubscribe response failed",
)
.into());
}
}
}
Err(_) => {
error!(
"registry proxy error: send unsubscribe request failed, url: {}",
url
);
return Err(RegistryProxyError::new("send unsubscribe opt failed").into());
}
}
}
fn url(&self) -> &Url {
&self.url
}
}
impl From<Box<dyn Registry + Send + Sync>> for RegistryProxy {
fn from(registry: Box<dyn Registry + Send + Sync>) -> Self {
let url = registry.url().clone();
let (sender, mut receiver) = tokio::sync::mpsc::channel(1024);
tokio::spawn(async move {
while let Some(opt) = receiver.recv().await {
match opt {
RegistryOpt::Register(url, tx) => {
let register = registry.register(url).await;
if let Err(_) = tx.send(register) {
error!("registry proxy error: send register response failed");
}
}
RegistryOpt::Unregister(url, tx) => {
let unregister = registry.unregister(url).await;
if let Err(_) = tx.send(unregister) {
error!("registry proxy error: send unregister response failed");
}
}
RegistryOpt::Subscribe(url, tx) => {
let subscribe = registry.subscribe(url).await;
if let Err(_) = tx.send(subscribe) {
error!("registry proxy error: send subscribe response failed");
}
}
RegistryOpt::UnSubscribe(url, tx) => {
let unsubscribe = registry.unsubscribe(url).await;
if let Err(_) = tx.send(unsubscribe) {
error!("registry proxy error: send unsubscribe response failed");
}
}
}
}
});
RegistryProxy { sender, url }
}
}
#[derive(Error, Debug)]
#[error("registry proxy error: {0}")]
pub(crate) struct RegistryProxyError(String);
impl RegistryProxyError {
pub(crate) fn new(msg: &str) -> Self {
RegistryProxyError(msg.to_string())
}
}
}