use std::sync::Arc;
use std::task::{Context, Poll};
use arc_swap::ArcSwap;
use futures::future::BoxFuture;
use hyper_util::rt::TokioIo;
use tokio::net::TcpStream;
use tonic::body::Body;
use tonic::transport::{Channel, Endpoint};
use tower::util::BoxCloneSyncService;
use tower::{Service, ServiceExt as _};
use crate::tonic::connector::TargetConnector;
use crate::tonic::naming::BoxError;
type ErasedConnector = BoxCloneSyncService<http::Uri, TokioIo<TcpStream>, BoxError>;
#[derive(Clone)]
pub struct SwapChannel {
inner: Arc<Inner>,
}
struct Inner {
channel: ArcSwap<Channel>,
connector: ErasedConnector,
endpoint_template: Endpoint,
}
impl SwapChannel {
pub fn new(endpoint_template: Endpoint, connector: TargetConnector) -> Self {
Self::with_connector(endpoint_template, connector)
}
pub fn with_connector<S>(endpoint_template: Endpoint, connector: S) -> Self
where
S: Service<http::Uri, Response = TokioIo<TcpStream>, Error = BoxError>
+ Clone
+ Send
+ Sync
+ 'static,
S::Future: Send + 'static,
{
let erased: ErasedConnector = BoxCloneSyncService::new(connector);
let initial = endpoint_template.connect_with_connector_lazy(erased.clone());
Self {
inner: Arc::new(Inner {
channel: ArcSwap::from_pointee(initial),
connector: erased,
endpoint_template,
}),
}
}
pub fn rebuild(&self) {
let new_channel = self
.inner
.endpoint_template
.connect_with_connector_lazy(self.inner.connector.clone());
self.inner.channel.store(Arc::new(new_channel));
}
}
impl Service<http::Request<Body>> for SwapChannel {
type Response = http::Response<Body>;
type Error = tonic::transport::Error;
type Future = BoxFuture<'static, Result<Self::Response, Self::Error>>;
fn poll_ready(&mut self, _cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: http::Request<Body>) -> Self::Future {
let mut ch = (**self.inner.channel.load()).clone();
Box::pin(async move {
let svc = ch.ready().await?;
svc.call(req).await
})
}
}