Skip to main content

crabka_operator/
context.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3
4use crabka_client_admin::{AdminClient, AdminClientLike};
5use kube::Client;
6use tokio::sync::Mutex;
7
8use crate::config::OperatorConfig;
9use crate::rebalancer_client::{ConnectRebalancerClient, RebalancerClientLike};
10use crate::telemetry::SharedRegistry;
11
12/// Boxed-dyn admin client handle: tests substitute a fake here without
13/// opening a TCP connection, while production code wraps a real
14/// `AdminClient`.
15pub type AdminClientHandle = Arc<Mutex<dyn AdminClientLike + Send>>;
16
17/// Boxed-dyn rebalancer client handle. Production wraps a
18/// [`ConnectRebalancerClient`]; reconcile tests substitute a fake. No
19/// `Mutex` — the client's methods take `&self` and the inner HTTP client
20/// is a shareable connection pool.
21pub type RebalancerClientHandle = Arc<dyn RebalancerClientLike>;
22
23/// Shared per-reconciler context. Cheap to clone (all fields Arc /
24/// shared via interior mutability).
25#[derive(Clone)]
26pub struct Context {
27    pub client: Client,
28    pub config: Arc<OperatorConfig>,
29    pub registry: SharedRegistry,
30    /// Per-cluster admin-client cache. Keyed by `Kafka` resource name.
31    /// Broken connections are replaced lazily on next use.
32    pub admin_clients: Arc<Mutex<HashMap<String, AdminClientHandle>>>,
33    /// Per-endpoint rebalancer-client cache. Keyed by the
34    /// resolved Connect base URL. Dropped + re-created lazily on
35    /// transport failure.
36    pub rebalancer_clients: Arc<Mutex<HashMap<String, RebalancerClientHandle>>>,
37}
38
39impl Context {
40    #[must_use]
41    pub fn new(client: Client, config: OperatorConfig, registry: SharedRegistry) -> Self {
42        Self {
43            client,
44            config: Arc::new(config),
45            registry,
46            admin_clients: Arc::new(Mutex::new(HashMap::new())),
47            rebalancer_clients: Arc::new(Mutex::new(HashMap::new())),
48        }
49    }
50
51    /// Look up or open an `AdminClient` for the named cluster.
52    ///
53    /// `bootstrap` is the inter-broker listener's `bootstrap_servers`
54    /// string, e.g. `demo-broker-headless.default.svc.cluster.local:9092`.
55    pub async fn admin_client_for(
56        &self,
57        cluster: &str,
58        bootstrap: &str,
59    ) -> Result<AdminClientHandle, crabka_client_admin::AdminError> {
60        let mut map = self.admin_clients.lock().await;
61        if let Some(client) = map.get(cluster) {
62            return Ok(client.clone());
63        }
64        let admin = AdminClient::connect(&[bootstrap.to_string()]).await?;
65        let entry: AdminClientHandle = Arc::new(Mutex::new(admin));
66        map.insert(cluster.to_string(), entry.clone());
67        Ok(entry)
68    }
69
70    /// Drop the cached admin client for `cluster` (used by reconcile when
71    /// a Transport error indicates the connection died — next call will
72    /// reopen).
73    pub async fn drop_admin_client(&self, cluster: &str) {
74        self.admin_clients.lock().await.remove(cluster);
75    }
76
77    /// Test-only: pre-populate the admin-client cache with a caller-supplied
78    /// handle. The `AdminClientLike` trait abstracts over the real client
79    /// and per-test fakes, so reconcile tests can drive the trait methods
80    /// without opening a TCP connection.
81    ///
82    /// Not cfg-gated: the function exists in the public API but is harmless
83    /// (and unused) in production — keeping it un-gated avoids a parallel
84    /// test-only build profile.
85    pub async fn insert_admin_client_for_test(&self, cluster: &str, admin: AdminClientHandle) {
86        self.admin_clients
87            .lock()
88            .await
89            .insert(cluster.to_string(), admin);
90    }
91
92    /// Look up or build a rebalancer client for `endpoint` (a Connect base
93    /// URL like `http://host:9300`). Construction is infallible (no
94    /// connection is opened until the first RPC), so this returns the
95    /// handle directly.
96    pub async fn rebalancer_client_for(&self, endpoint: &str) -> RebalancerClientHandle {
97        let mut map = self.rebalancer_clients.lock().await;
98        if let Some(client) = map.get(endpoint) {
99            return client.clone();
100        }
101        let client: RebalancerClientHandle = Arc::new(ConnectRebalancerClient::new(endpoint));
102        map.insert(endpoint.to_string(), client.clone());
103        client
104    }
105
106    /// Drop the cached rebalancer client for `endpoint` (used by reconcile
107    /// after a transport error — the next call rebuilds it).
108    pub async fn drop_rebalancer_client(&self, endpoint: &str) {
109        self.rebalancer_clients.lock().await.remove(endpoint);
110    }
111
112    /// Test-only: pre-populate the rebalancer-client cache with a fake.
113    /// Mirrors [`Self::insert_admin_client_for_test`].
114    pub async fn insert_rebalancer_client_for_test(
115        &self,
116        endpoint: &str,
117        client: RebalancerClientHandle,
118    ) {
119        self.rebalancer_clients
120            .lock()
121            .await
122            .insert(endpoint.to_string(), client);
123    }
124}