use std::collections::HashMap;
use std::sync::Arc;
use crabka_client_admin::{AdminClient, AdminClientLike};
use kube::Client;
use tokio::sync::Mutex;
use crate::config::OperatorConfig;
use crate::rebalancer_client::{ConnectRebalancerClient, RebalancerClientLike};
use crate::telemetry::SharedRegistry;
pub type AdminClientHandle = Arc<Mutex<dyn AdminClientLike + Send>>;
pub type RebalancerClientHandle = Arc<dyn RebalancerClientLike>;
#[derive(Clone)]
pub struct Context {
pub client: Client,
pub config: Arc<OperatorConfig>,
pub registry: SharedRegistry,
pub admin_clients: Arc<Mutex<HashMap<String, AdminClientHandle>>>,
pub rebalancer_clients: Arc<Mutex<HashMap<String, RebalancerClientHandle>>>,
}
impl Context {
#[must_use]
pub fn new(client: Client, config: OperatorConfig, registry: SharedRegistry) -> Self {
Self {
client,
config: Arc::new(config),
registry,
admin_clients: Arc::new(Mutex::new(HashMap::new())),
rebalancer_clients: Arc::new(Mutex::new(HashMap::new())),
}
}
pub async fn admin_client_for(
&self,
cluster: &str,
bootstrap: &str,
) -> Result<AdminClientHandle, crabka_client_admin::AdminError> {
let mut map = self.admin_clients.lock().await;
if let Some(client) = map.get(cluster) {
return Ok(client.clone());
}
let admin = AdminClient::connect(&[bootstrap.to_string()]).await?;
let entry: AdminClientHandle = Arc::new(Mutex::new(admin));
map.insert(cluster.to_string(), entry.clone());
Ok(entry)
}
pub async fn drop_admin_client(&self, cluster: &str) {
self.admin_clients.lock().await.remove(cluster);
}
pub async fn insert_admin_client_for_test(&self, cluster: &str, admin: AdminClientHandle) {
self.admin_clients
.lock()
.await
.insert(cluster.to_string(), admin);
}
pub async fn rebalancer_client_for(&self, endpoint: &str) -> RebalancerClientHandle {
let mut map = self.rebalancer_clients.lock().await;
if let Some(client) = map.get(endpoint) {
return client.clone();
}
let client: RebalancerClientHandle = Arc::new(ConnectRebalancerClient::new(endpoint));
map.insert(endpoint.to_string(), client.clone());
client
}
pub async fn drop_rebalancer_client(&self, endpoint: &str) {
self.rebalancer_clients.lock().await.remove(endpoint);
}
pub async fn insert_rebalancer_client_for_test(
&self,
endpoint: &str,
client: RebalancerClientHandle,
) {
self.rebalancer_clients
.lock()
.await
.insert(endpoint.to_string(), client);
}
}