crabka_operator/
context.rs1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crabka_client_admin::{AdminClient, AdminClientLike};
5use kube::Client;
6use tokio::sync::Mutex;
7
8use crate::config::OperatorConfig;
9use crate::rebalancer_client::{ConnectRebalancerClient, RebalancerClientLike};
10use crate::telemetry::SharedRegistry;
11
12pub type AdminClientHandle = Arc<Mutex<dyn AdminClientLike + Send>>;
16
17pub type RebalancerClientHandle = Arc<dyn RebalancerClientLike>;
22
23#[derive(Clone)]
26pub struct Context {
27 pub client: Client,
28 pub config: Arc<OperatorConfig>,
29 pub registry: SharedRegistry,
30 pub admin_clients: Arc<Mutex<HashMap<String, AdminClientHandle>>>,
33 pub rebalancer_clients: Arc<Mutex<HashMap<String, RebalancerClientHandle>>>,
37}
38
39impl Context {
40 #[must_use]
41 pub fn new(client: Client, config: OperatorConfig, registry: SharedRegistry) -> Self {
42 Self {
43 client,
44 config: Arc::new(config),
45 registry,
46 admin_clients: Arc::new(Mutex::new(HashMap::new())),
47 rebalancer_clients: Arc::new(Mutex::new(HashMap::new())),
48 }
49 }
50
51 pub async fn admin_client_for(
56 &self,
57 cluster: &str,
58 bootstrap: &str,
59 ) -> Result<AdminClientHandle, crabka_client_admin::AdminError> {
60 let mut map = self.admin_clients.lock().await;
61 if let Some(client) = map.get(cluster) {
62 return Ok(client.clone());
63 }
64 let admin = AdminClient::connect(&[bootstrap.to_string()]).await?;
65 let entry: AdminClientHandle = Arc::new(Mutex::new(admin));
66 map.insert(cluster.to_string(), entry.clone());
67 Ok(entry)
68 }
69
70 pub async fn drop_admin_client(&self, cluster: &str) {
74 self.admin_clients.lock().await.remove(cluster);
75 }
76
77 pub async fn insert_admin_client_for_test(&self, cluster: &str, admin: AdminClientHandle) {
86 self.admin_clients
87 .lock()
88 .await
89 .insert(cluster.to_string(), admin);
90 }
91
92 pub async fn rebalancer_client_for(&self, endpoint: &str) -> RebalancerClientHandle {
97 let mut map = self.rebalancer_clients.lock().await;
98 if let Some(client) = map.get(endpoint) {
99 return client.clone();
100 }
101 let client: RebalancerClientHandle = Arc::new(ConnectRebalancerClient::new(endpoint));
102 map.insert(endpoint.to_string(), client.clone());
103 client
104 }
105
106 pub async fn drop_rebalancer_client(&self, endpoint: &str) {
109 self.rebalancer_clients.lock().await.remove(endpoint);
110 }
111
112 pub async fn insert_rebalancer_client_for_test(
115 &self,
116 endpoint: &str,
117 client: RebalancerClientHandle,
118 ) {
119 self.rebalancer_clients
120 .lock()
121 .await
122 .insert(endpoint.to_string(), client);
123 }
124}