use std::{
collections::HashMap,
sync::{Arc, Mutex},
time::Duration,
};
use engula_api::server::v1::root_client::RootClient;
use tonic::transport::{Channel, Endpoint};
use crate::{Error, NodeClient, Result};
#[derive(Clone, Debug)]
pub struct ConnManager {
core: Arc<Mutex<Core>>,
}
#[derive(Debug)]
struct Core {
channels: HashMap<String, ChannelInfo>,
}
#[derive(Debug)]
struct ChannelInfo {
channel: Channel,
access: usize,
}
impl ConnManager {
pub fn new() -> Self {
ConnManager::default()
}
pub async fn get(&self, addr: String) -> Result<Channel> {
let mut core = self.core.lock().unwrap();
if let Some(info) = core.channels.get_mut(&addr) {
info.access += 1;
return Ok(info.channel.clone());
}
let channel = match Endpoint::new(format!("http://{}", addr)) {
Ok(endpoint) => endpoint.connect_lazy(),
Err(e) => return Err(Error::Internal(Box::new(e))),
};
let info = ChannelInfo {
channel: channel.clone(),
access: 1,
};
core.channels.insert(addr, info);
Ok(channel)
}
#[inline]
pub async fn get_node_client(&self, addr: String) -> Result<NodeClient> {
let channel = self.get(addr).await?;
Ok(NodeClient::new(channel))
}
#[inline]
pub async fn get_root_client(&self, addr: String) -> Result<RootClient<Channel>> {
let channel = self.get(addr).await?;
Ok(RootClient::new(channel))
}
}
impl Default for ConnManager {
fn default() -> Self {
let core = Arc::new(Mutex::new(Core {
channels: HashMap::default(),
}));
let cloned_core = core.clone();
tokio::spawn(async move {
recycle_conn_main(cloned_core).await;
});
ConnManager { core }
}
}
async fn recycle_conn_main(core: Arc<Mutex<Core>>) {
let mut interval = tokio::time::interval(Duration::from_secs(60));
loop {
interval.tick().await;
let mut core = core.lock().unwrap();
core.channels.retain(|_, v| {
if v.access == 0 {
false
} else {
v.access = 0;
true
}
});
}
}