Skip to main content

couchbase_core/
agent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::address::Address;
20use crate::auth_mechanism::AuthMechanism;
21use crate::authenticator::Authenticator;
22use crate::cbconfig::TerseConfig;
23use crate::collection_resolver_cached::{
24    CollectionResolverCached, CollectionResolverCachedOptions,
25};
26use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
27use crate::compressionmanager::{CompressionManager, StdCompressor};
28use crate::configmanager::{
29    ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
30};
31use crate::configparser::ConfigParser;
32use crate::crudcomponent::CrudComponent;
33use crate::diagnosticscomponent::{DiagnosticsComponent, DiagnosticsComponentConfig};
34use crate::errmapcomponent::ErrMapComponent;
35use crate::error::{Error, ErrorKind, Result};
36use crate::features::BucketFeature;
37use crate::httpcomponent::HttpComponent;
38use crate::httpx::client::{ClientConfig, ReqwestClient};
39use crate::kvclient::{
40    KvClient, KvClientBootstrapOptions, KvClientOptions, StdKvClient, UnsolicitedPacket,
41};
42use crate::kvclient_ops::KvClientOps;
43use crate::kvclientpool::{KvClientPool, KvClientPoolOptions, StdKvClientPool};
44use crate::memdx::client::Client;
45use crate::memdx::opcode::OpCode;
46use crate::memdx::packet::ResponsePacket;
47use crate::memdx::request::GetClusterConfigRequest;
48use crate::mgmtcomponent::{MgmtComponent, MgmtComponentConfig, MgmtComponentOptions};
49use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions};
50use crate::networktypeheuristic::NetworkTypeHeuristic;
51use crate::nmvbhandler::{ConfigUpdater, StdNotMyVbucketConfigHandler};
52use crate::options::agent::{AgentOptions, ReconfigureAgentOptions};
53use crate::parsedconfig::{ParsedConfig, ParsedConfigBucketFeature, ParsedConfigFeature};
54use crate::querycomponent::{QueryComponent, QueryComponentConfig, QueryComponentOptions};
55use crate::retry::RetryManager;
56use crate::searchcomponent::{SearchComponent, SearchComponentConfig, SearchComponentOptions};
57use crate::service_type::ServiceType;
58use crate::tls_config::TlsConfig;
59use crate::util::{get_host_port_from_uri, get_hostname_from_host_port};
60use crate::vbucketrouter::{
61    StdVbucketRouter, VbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
62};
63use crate::{httpx, mgmtx};
64
65use byteorder::BigEndian;
66use futures::executor::block_on;
67use tracing::{debug, error, info, warn};
68use uuid::Uuid;
69
70use crate::analyticscomponent::{AnalyticsComponent, AnalyticsComponentOptions};
71use crate::componentconfigs::{AgentComponentConfigs, HttpClientConfig};
72use crate::httpx::request::{Auth, BasicAuth, BearerAuth};
73use crate::kvclient_babysitter::{KvTarget, StdKvClientBabysitter};
74use crate::kvendpointclientmanager::{
75    KvEndpointClientManager, KvEndpointClientManagerOptions, StdKvEndpointClientManager,
76};
77use crate::orphan_reporter::OrphanReporter;
78use crate::tracingcomponent::{TracingComponent, TracingComponentConfig};
79use arc_swap::ArcSwap;
80use std::cmp::Ordering;
81use std::collections::HashMap;
82use std::error::Error as StdError;
83use std::fmt::{format, Display};
84use std::io::Cursor;
85use std::net::ToSocketAddrs;
86use std::ops::{Add, Deref};
87use std::sync::{Arc, Weak};
88use std::time::Duration;
89use tokio::io::AsyncReadExt;
90use tokio::net;
91use tokio::runtime::{Handle, Runtime};
92use tokio::sync::broadcast::{Receiver, Sender};
93use tokio::sync::{broadcast, mpsc, Mutex};
94use tokio::task::JoinHandle;
95use tokio::time::{sleep, timeout, timeout_at, Instant};
96
97#[derive(Clone)]
98struct AgentState {
99    bucket: Option<String>,
100    tls_config: Option<TlsConfig>,
101    authenticator: Authenticator,
102    auth_mechanisms: Vec<AuthMechanism>,
103    num_pool_connections: usize,
104    // http_transport:
105    latest_config: ParsedConfig,
106    network_type: String,
107
108    disable_error_map: bool,
109    disable_mutation_tokens: bool,
110    disable_server_durations: bool,
111    kv_connect_timeout: Duration,
112    kv_connect_throttle_timeout: Duration,
113    http_idle_connection_timeout: Duration,
114    http_max_idle_connections_per_host: Option<usize>,
115    tcp_keep_alive_time: Duration,
116}
117
118impl Display for AgentState {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        write!(
121            f,
122            "{{ bucket: {:?}, network_type: {}, num_pool_connections: {}, latest_config_rev_id: {}, latest_config_rev_epoch: {}, authenticator: {} }}",
123            self.bucket,
124            self.network_type,
125            self.num_pool_connections,
126            self.latest_config.rev_id,
127            self.latest_config.rev_epoch,
128            self.authenticator,
129
130        )
131    }
132}
133
134type AgentClientManager = StdKvEndpointClientManager<
135    StdKvClientPool<StdKvClientBabysitter<StdKvClient<Client>>, StdKvClient<Client>>,
136    StdKvClient<Client>,
137>;
138type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<AgentClientManager>>;
139
140pub(crate) struct AgentInner {
141    state: Arc<Mutex<AgentState>>,
142    bucket: Option<String>,
143
144    cfg_manager: Arc<ConfigManagerMemd<AgentClientManager>>,
145    conn_mgr: Arc<AgentClientManager>,
146    vb_router: Arc<StdVbucketRouter>,
147    collections: Arc<AgentCollectionResolver>,
148    retry_manager: Arc<RetryManager>,
149    http_client: Arc<ReqwestClient>,
150    err_map_component: Arc<ErrMapComponent>,
151
152    pub(crate) crud: CrudComponent<
153        AgentClientManager,
154        StdVbucketRouter,
155        StdNotMyVbucketConfigHandler<AgentInner>,
156        AgentCollectionResolver,
157        StdCompressor,
158    >,
159
160    pub(crate) analytics: Arc<AnalyticsComponent<ReqwestClient>>,
161    pub(crate) query: Arc<QueryComponent<ReqwestClient>>,
162    pub(crate) search: Arc<SearchComponent<ReqwestClient>>,
163    pub(crate) mgmt: MgmtComponent<ReqwestClient>,
164    pub(crate) diagnostics: DiagnosticsComponent<ReqwestClient, AgentClientManager>,
165    pub(crate) tracing: Arc<TracingComponent>,
166}
167
168pub struct Agent {
169    pub(crate) inner: Arc<AgentInner>,
170    user_agent: String,
171    id: String,
172}
173
174impl AgentInner {
175    fn gen_agent_component_configs_locked(state: &AgentState) -> AgentComponentConfigs {
176        AgentComponentConfigs::gen_from_config(
177            &state.latest_config,
178            &state.network_type,
179            state.tls_config.clone(),
180            state.bucket.clone(),
181            state.authenticator.clone(),
182        )
183    }
184
185    pub async fn unsolicited_packet_handler(&self, up: UnsolicitedPacket) {
186        let packet = up.packet;
187        if packet.op_code == OpCode::Set {
188            if let Some(ref extras) = packet.extras {
189                if extras.len() < 16 {
190                    warn!("Received Set packet with too short extras: {packet:?}");
191                    return;
192                }
193
194                let mut cursor = Cursor::new(extras);
195                let server_rev_epoch = cursor.read_i64().await.unwrap();
196                let server_rev_id = cursor.read_i64().await.unwrap();
197
198                if let Some(config) = self
199                    .cfg_manager
200                    .out_of_band_version(server_rev_id, server_rev_epoch, up.endpoint_id)
201                    .await
202                {
203                    self.apply_config(config).await;
204                }
205            } else {
206                warn!("Received Set packet with no extras: {packet:?}");
207            }
208        }
209    }
210
211    pub async fn apply_config(&self, config: ParsedConfig) {
212        let mut state = self.state.lock().await;
213
214        info!(
215            "Agent applying updated config: rev_id={rev_id}, rev_epoch={rev_epoch}",
216            rev_id = config.rev_id,
217            rev_epoch = config.rev_epoch
218        );
219        state.latest_config = config;
220
221        self.update_state_locked(&mut state).await;
222    }
223
224    async fn update_state_locked(&self, state: &mut AgentState) {
225        debug!("Agent updating state {}", state);
226
227        let agent_component_configs = Self::gen_agent_component_configs_locked(state);
228
229        // In order to avoid race conditions between operations selecting the
230        // endpoint they need to send the request to, and fetching an actual
231        // client which can send to that endpoint.  We must first ensure that
232        // all the new endpoints are available in the manager.  Then update
233        // the routing table.  Then go back and remove the old entries from
234        // the connection manager list.
235
236        if let Err(e) = self
237            .conn_mgr
238            .update_endpoints(agent_component_configs.kv_targets.clone(), true)
239            .await
240        {
241            error!("Failed to reconfigure connection manager (add-only); {e}");
242        };
243
244        self.vb_router
245            .update_vbucket_info(agent_component_configs.vbucket_routing_info);
246
247        if let Err(e) = self
248            .cfg_manager
249            .reconfigure(agent_component_configs.config_manager_memd_config)
250        {
251            error!("Failed to reconfigure memd config watcher component; {e}");
252        }
253
254        if let Err(e) = self
255            .conn_mgr
256            .update_endpoints(agent_component_configs.kv_targets, false)
257            .await
258        {
259            error!("Failed to reconfigure connection manager; {e}");
260        }
261
262        self.analytics
263            .reconfigure(agent_component_configs.analytics_config);
264        self.query.reconfigure(agent_component_configs.query_config);
265        self.search
266            .reconfigure(agent_component_configs.search_config);
267        self.mgmt.reconfigure(agent_component_configs.mgmt_config);
268        self.diagnostics
269            .reconfigure(agent_component_configs.diagnostics_config);
270        self.tracing
271            .reconfigure(agent_component_configs.tracing_config);
272    }
273
274    pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
275        let guard = self.state.lock().await;
276
277        if let Some(bucket) = &guard.latest_config.bucket {
278            let mut features = vec![];
279
280            for feature in &bucket.features {
281                match feature {
282                    ParsedConfigBucketFeature::CreateAsDeleted => {
283                        features.push(BucketFeature::CreateAsDeleted)
284                    }
285                    ParsedConfigBucketFeature::ReplaceBodyWithXattr => {
286                        features.push(BucketFeature::ReplaceBodyWithXattr)
287                    }
288                    ParsedConfigBucketFeature::RangeScan => features.push(BucketFeature::RangeScan),
289                    ParsedConfigBucketFeature::ReplicaRead => {
290                        features.push(BucketFeature::ReplicaRead)
291                    }
292                    ParsedConfigBucketFeature::NonDedupedHistory => {
293                        features.push(BucketFeature::NonDedupedHistory)
294                    }
295                    ParsedConfigBucketFeature::ReviveDocument => {
296                        features.push(BucketFeature::ReviveDocument)
297                    }
298                    _ => {}
299                }
300            }
301
302            return Ok(features);
303        }
304
305        Err(ErrorKind::NoBucket.into())
306    }
307
308    pub(crate) fn get_bucket_name(&self) -> Option<String> {
309        self.bucket.clone()
310    }
311
312    pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
313        let mut state = self.state.lock().await;
314        state.tls_config = opts.tls_config.clone();
315        state.authenticator = opts.authenticator.clone();
316
317        // We manually update tls for http as it requires rebuilding the client.
318        match self
319            .http_client
320            .update_tls(httpx::client::UpdateTlsOptions {
321                tls_config: opts.tls_config,
322            }) {
323            Ok(_) => {}
324            Err(e) => {
325                warn!("Failed to update TLS for HTTP client: {}", e);
326            }
327        };
328
329        self.conn_mgr.update_auth(opts.authenticator).await;
330
331        self.update_state_locked(&mut state).await;
332    }
333}
334
335impl ConfigUpdater for AgentInner {
336    async fn apply_terse_config(&self, config: TerseConfig, source_hostname: &str) {
337        let parsed_config = match ConfigParser::parse_terse_config(config, source_hostname) {
338            Ok(cfg) => cfg,
339            Err(_e) => {
340                // TODO: log
341                return;
342            }
343        };
344
345        if let Some(config) = self.cfg_manager.out_of_band_config(parsed_config) {
346            self.apply_config(config).await;
347        };
348    }
349}
350
351impl Agent {
352    pub async fn new(opts: AgentOptions) -> Result<Self> {
353        let build_version = env!("CARGO_PKG_VERSION");
354        let user_agent = format!("cb-rust/{build_version}");
355        let agent_id = Uuid::new_v4().to_string();
356        info!(
357            "Core SDK Version: {} - Agent ID: {}",
358            &user_agent, &agent_id
359        );
360        info!("Agent Options {opts}");
361
362        let auth_mechanisms = if !opts.auth_mechanisms.is_empty() {
363            if opts.tls_config.is_none() && opts.auth_mechanisms.contains(&AuthMechanism::Plain) {
364                warn!("PLAIN sends credentials in plaintext, this will cause credential leakage on the network");
365            } else if opts.tls_config.is_some()
366                && (opts.auth_mechanisms.contains(&AuthMechanism::ScramSha512)
367                    || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha256)
368                    || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha1))
369            {
370                warn!("Consider using PLAIN for TLS connections, as it is more efficient");
371            }
372
373            opts.auth_mechanisms
374        } else {
375            vec![]
376        };
377
378        let mut state = AgentState {
379            bucket: opts.bucket_name.clone(),
380            authenticator: opts.authenticator.clone(),
381            num_pool_connections: opts.kv_config.num_connections,
382            latest_config: ParsedConfig::default(),
383            network_type: "".to_string(),
384            tls_config: opts.tls_config,
385            auth_mechanisms: auth_mechanisms.clone(),
386            disable_error_map: !opts.kv_config.enable_error_map,
387            disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens,
388            disable_server_durations: !opts.kv_config.enable_server_durations,
389            kv_connect_timeout: opts.kv_config.connect_timeout,
390            kv_connect_throttle_timeout: opts.kv_config.connect_throttle_timeout,
391            http_idle_connection_timeout: opts.http_config.idle_connection_timeout,
392            http_max_idle_connections_per_host: opts.http_config.max_idle_connections_per_host,
393            tcp_keep_alive_time: opts
394                .tcp_keep_alive_time
395                .unwrap_or_else(|| Duration::from_secs(60)),
396        };
397
398        let bucket_name = opts.bucket_name.clone();
399
400        let http_client = Arc::new(ReqwestClient::new(ClientConfig {
401            tls_config: state.tls_config.clone(),
402            idle_connection_timeout: state.http_idle_connection_timeout,
403            max_idle_connections_per_host: state.http_max_idle_connections_per_host,
404            tcp_keep_alive_time: state.tcp_keep_alive_time,
405        })?);
406
407        let err_map_component = Arc::new(ErrMapComponent::new());
408
409        let connect_timeout = opts.kv_config.connect_timeout;
410
411        let first_kv_client_configs =
412            Self::gen_first_kv_client_configs(&opts.seed_config.memd_addrs, &state);
413        let first_http_client_configs = Self::gen_first_http_endpoints(
414            user_agent.clone(),
415            &opts.seed_config.http_addrs,
416            &state,
417        );
418        let (first_config, cfg_source_host_port) = Self::get_first_config(
419            user_agent.clone(),
420            first_kv_client_configs,
421            &state,
422            first_http_client_configs,
423            http_client.clone(),
424            err_map_component.clone(),
425            connect_timeout,
426        )
427        .await?;
428
429        state.latest_config = first_config.clone();
430
431        let network_type = if let Some(network) = opts.network {
432            if network == "auto" || network.is_empty() {
433                NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
434            } else {
435                network
436            }
437        } else {
438            NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
439        };
440        info!("Identified network type: {network_type}");
441        state.network_type = network_type;
442
443        let agent_component_configs = AgentInner::gen_agent_component_configs_locked(&state);
444
445        let tracing = Arc::new(TracingComponent::new(
446            agent_component_configs.tracing_config,
447        ));
448
449        let err_map_component_conn_mgr = err_map_component.clone();
450
451        let num_pool_connections = state.num_pool_connections;
452
453        let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel();
454
455        let conn_mgr = Arc::new(
456            StdKvEndpointClientManager::new(KvEndpointClientManagerOptions {
457                on_close_handler: Arc::new(|_manager_id| {}),
458                on_demand_connect: opts.kv_config.on_demand_connect,
459                num_pool_connections,
460                connect_throttle_period: opts.kv_config.connect_throttle_timeout,
461                bootstrap_options: KvClientBootstrapOptions {
462                    client_name: user_agent.clone(),
463                    disable_error_map: state.disable_error_map,
464                    disable_mutation_tokens: state.disable_mutation_tokens,
465                    disable_server_durations: state.disable_server_durations,
466                    on_err_map_fetched: Some(Arc::new(move |err_map| {
467                        err_map_component_conn_mgr.on_err_map(err_map);
468                    })),
469                    tcp_keep_alive_time: state.tcp_keep_alive_time,
470                    auth_mechanisms,
471                    connect_timeout,
472                },
473                unsolicited_packet_tx: Some(unsolicited_packet_tx),
474                orphan_handler: opts.orphan_response_handler,
475                endpoints: agent_component_configs.kv_targets,
476                authenticator: opts.authenticator,
477                disable_decompression: opts.compression_config.disable_decompression,
478                selected_bucket: opts.bucket_name,
479                tracing: tracing.clone(),
480            })
481            .await?,
482        );
483
484        let cfg_manager = Arc::new(ConfigManagerMemd::new(
485            agent_component_configs.config_manager_memd_config,
486            ConfigManagerMemdOptions {
487                polling_period: opts.config_poller_config.poll_interval,
488                kv_client_manager: conn_mgr.clone(),
489                first_config,
490                fetch_timeout: opts.config_poller_config.fetch_timeout,
491            },
492        ));
493        let vb_router = Arc::new(StdVbucketRouter::new(
494            agent_component_configs.vbucket_routing_info,
495            VbucketRouterOptions {},
496        ));
497
498        let nmvb_handler = Arc::new(StdNotMyVbucketConfigHandler::new());
499
500        let memd_resolver = CollectionResolverMemd::new(CollectionResolverMemdOptions {
501            conn_mgr: conn_mgr.clone(),
502        });
503
504        let collections = Arc::new(CollectionResolverCached::new(
505            CollectionResolverCachedOptions {
506                resolver: memd_resolver,
507            },
508        ));
509
510        let retry_manager = Arc::new(RetryManager::new(err_map_component.clone()));
511        let compression_manager = Arc::new(CompressionManager::new(opts.compression_config));
512
513        let crud = CrudComponent::new(
514            nmvb_handler.clone(),
515            vb_router.clone(),
516            conn_mgr.clone(),
517            collections.clone(),
518            retry_manager.clone(),
519            compression_manager,
520        );
521
522        let mgmt = MgmtComponent::new(
523            retry_manager.clone(),
524            http_client.clone(),
525            tracing.clone(),
526            agent_component_configs.mgmt_config,
527            MgmtComponentOptions {
528                user_agent: user_agent.clone(),
529            },
530        );
531
532        let analytics = Arc::new(AnalyticsComponent::new(
533            retry_manager.clone(),
534            http_client.clone(),
535            agent_component_configs.analytics_config,
536            AnalyticsComponentOptions {
537                user_agent: user_agent.clone(),
538            },
539        ));
540
541        let query = Arc::new(QueryComponent::new(
542            retry_manager.clone(),
543            http_client.clone(),
544            tracing.clone(),
545            agent_component_configs.query_config,
546            QueryComponentOptions {
547                user_agent: user_agent.clone(),
548            },
549        ));
550
551        let search = Arc::new(SearchComponent::new(
552            retry_manager.clone(),
553            http_client.clone(),
554            tracing.clone(),
555            agent_component_configs.search_config,
556            SearchComponentOptions {
557                user_agent: user_agent.clone(),
558            },
559        ));
560
561        let diagnostics = DiagnosticsComponent::new(
562            conn_mgr.clone(),
563            query.clone(),
564            search.clone(),
565            retry_manager.clone(),
566            agent_component_configs.diagnostics_config,
567        );
568
569        let state = Arc::new(Mutex::new(state));
570
571        let inner = Arc::new(AgentInner {
572            state,
573            bucket: bucket_name,
574            cfg_manager: cfg_manager.clone(),
575            conn_mgr,
576            vb_router,
577            crud,
578            collections,
579            retry_manager,
580            http_client,
581            err_map_component,
582
583            mgmt,
584            analytics,
585            query,
586            search,
587            diagnostics,
588            tracing,
589        });
590
591        let inner_clone = Arc::downgrade(&inner);
592        tokio::spawn(async move {
593            while let Some(packet) = unsolicited_packet_rx.recv().await {
594                if let Some(inner_clone) = inner_clone.upgrade() {
595                    inner_clone.unsolicited_packet_handler(packet).await;
596                } else {
597                    break;
598                }
599            }
600            debug!("Unsolicited packet handler exited");
601        });
602
603        nmvb_handler.set_watcher(Arc::downgrade(&inner)).await;
604
605        Self::start_config_watcher(Arc::downgrade(&inner), cfg_manager);
606
607        let agent = Agent {
608            inner,
609            user_agent,
610            id: agent_id.clone(),
611        };
612
613        info!("Agent {agent_id} created");
614
615        Ok(agent)
616    }
617
618    // reconfigure allows updating certain aspects of the agent at runtime.
619    // Note: toggling TLS on and off is not supported and will result in internal errors.
620    pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
621        self.inner.reconfigure(opts).await
622    }
623
624    fn start_config_watcher(
625        inner: Weak<AgentInner>,
626        config_watcher: Arc<impl ConfigManager>,
627    ) -> JoinHandle<()> {
628        let mut watch_rx = config_watcher.watch();
629
630        let inner = inner.clone();
631        tokio::spawn(async move {
632            loop {
633                match watch_rx.changed().await {
634                    Ok(_) => {
635                        let pc = {
636                            // apply_config requires an owned ParsedConfig, as it takes ownership of it.
637                            // Doing the clone within a block also means we can release the lock that
638                            // borrow_and_update() takes as soon as possible.
639                            watch_rx.borrow_and_update().clone()
640                        };
641                        if let Some(i) = inner.upgrade() {
642                            i.apply_config(pc).await;
643                        } else {
644                            debug!("Config watcher inner dropped, exiting");
645                            return;
646                        }
647                    }
648                    Err(_e) => {
649                        debug!("Config watcher channel closed");
650                        return;
651                    }
652                }
653            }
654        })
655    }
656
657    async fn get_first_config<C: httpx::client::Client>(
658        client_name: String,
659        kv_targets: HashMap<String, KvTarget>,
660        state: &AgentState,
661        http_configs: HashMap<String, FirstHttpConfig>,
662        http_client: Arc<C>,
663        err_map_component: Arc<ErrMapComponent>,
664        connect_timeout: Duration,
665    ) -> Result<(ParsedConfig, String)> {
666        loop {
667            for target in kv_targets.values() {
668                let host = &target.address;
669                let err_map_component_clone = err_map_component.clone();
670                let timeout_result = timeout(
671                    connect_timeout,
672                    StdKvClient::new(KvClientOptions {
673                        address: target.clone(),
674                        authenticator: state.authenticator.clone(),
675                        selected_bucket: state.bucket.clone(),
676                        bootstrap_options: KvClientBootstrapOptions {
677                            client_name: client_name.clone(),
678                            disable_error_map: state.disable_error_map,
679                            disable_mutation_tokens: true,
680                            disable_server_durations: true,
681                            on_err_map_fetched: Some(Arc::new(move |err_map| {
682                                err_map_component_clone.on_err_map(err_map);
683                            })),
684                            tcp_keep_alive_time: state.tcp_keep_alive_time,
685                            auth_mechanisms: state.auth_mechanisms.clone(),
686                            connect_timeout,
687                        },
688                        endpoint_id: "".to_string(),
689                        unsolicited_packet_tx: None,
690                        orphan_handler: None,
691                        on_close_tx: None,
692                        disable_decompression: false,
693                        id: Uuid::new_v4().to_string(),
694                        tracing: Default::default(),
695                    }),
696                )
697                .await;
698
699                let client: StdKvClient<Client> = match timeout_result {
700                    Ok(client_result) => match client_result {
701                        Ok(client) => client,
702                        Err(e) => {
703                            let mut msg = format!("Failed to connect to endpoint: {e}");
704                            if let Some(source) = e.source() {
705                                msg = format!("{msg} - {source}");
706                            }
707                            warn!("{msg}");
708                            continue;
709                        }
710                    },
711                    Err(_e) => continue,
712                };
713
714                let raw_config = match client
715                    .get_cluster_config(GetClusterConfigRequest {
716                        known_version: None,
717                    })
718                    .await
719                {
720                    Ok(resp) => resp.config,
721                    Err(_e) => continue,
722                };
723
724                client.close().await?;
725
726                let config: TerseConfig = serde_json::from_slice(&raw_config).map_err(|e| {
727                    Error::new_message_error(format!("failed to deserialize config: {e}"))
728                })?;
729
730                match ConfigParser::parse_terse_config(config, host.host.as_str()) {
731                    Ok(c) => {
732                        return Ok((c, format!("{}:{}", host.host, host.port)));
733                    }
734                    Err(_e) => continue,
735                };
736            }
737
738            info!("Failed to fetch config over kv, attempting http");
739            for endpoint_config in http_configs.values() {
740                let endpoint = endpoint_config.endpoint.clone();
741                let host_port = get_host_port_from_uri(&endpoint)?;
742                let auth = match &endpoint_config.authenticator {
743                    Authenticator::PasswordAuthenticator(authenticator) => {
744                        let user_pass =
745                            authenticator.get_credentials(&ServiceType::MGMT, host_port.clone())?;
746                        Auth::BasicAuth(BasicAuth::new(user_pass.username, user_pass.password))
747                    }
748                    Authenticator::CertificateAuthenticator(_authenticator) => {
749                        Auth::BasicAuth(BasicAuth::new("".to_string(), "".to_string()))
750                    }
751                    Authenticator::JwtAuthenticator(authenticator) => {
752                        Auth::BearerAuth(BearerAuth::new(authenticator.get_token()))
753                    }
754                };
755
756                match Self::fetch_http_config(
757                    http_client.clone(),
758                    endpoint,
759                    endpoint_config.user_agent.clone(),
760                    auth,
761                    endpoint_config.bucket_name.clone(),
762                )
763                .await
764                {
765                    Ok(c) => {
766                        return Ok((c, host_port));
767                    }
768                    Err(_e) => {}
769                };
770            }
771
772            info!("Failed to fetch config from any source");
773
774            // TODO: Make configurable?
775            sleep(Duration::from_secs(1)).await;
776        }
777    }
778
779    pub(crate) async fn fetch_http_config<C: httpx::client::Client>(
780        http_client: Arc<C>,
781        endpoint: String,
782        user_agent: String,
783        auth: Auth,
784        bucket_name: Option<String>,
785    ) -> Result<ParsedConfig> {
786        debug!("Polling config from {}", &endpoint);
787
788        let host_port = get_host_port_from_uri(&endpoint)?;
789        let hostname = get_hostname_from_host_port(&host_port)?;
790
791        let parsed = if let Some(bucket_name) = bucket_name {
792            let config = mgmtx::mgmt::Management {
793                http_client,
794                user_agent,
795                endpoint: endpoint.clone(),
796                canonical_endpoint: endpoint.clone(),
797                auth,
798                tracing: Default::default(),
799            }
800            .get_terse_bucket_config(&GetTerseBucketConfigOptions {
801                bucket_name: &bucket_name,
802                on_behalf_of_info: None,
803            })
804            .await
805            .map_err(Error::from)?;
806
807            ConfigParser::parse_terse_config(config, &hostname)?
808        } else {
809            let config = mgmtx::mgmt::Management {
810                http_client,
811                user_agent,
812                endpoint: endpoint.clone(),
813                canonical_endpoint: endpoint.clone(),
814                auth,
815                tracing: Default::default(),
816            }
817            .get_terse_cluster_config(&GetTerseClusterConfigOptions {
818                on_behalf_of_info: None,
819            })
820            .await
821            .map_err(Error::from)?;
822
823            ConfigParser::parse_terse_config(config, &hostname)?
824        };
825
826        Ok(parsed)
827    }
828
829    fn gen_first_kv_client_configs(
830        memd_addrs: &Vec<Address>,
831        state: &AgentState,
832    ) -> HashMap<String, KvTarget> {
833        let mut clients = HashMap::new();
834        for addr in memd_addrs {
835            let node_id = format!("kv-{addr}");
836            let target = KvTarget {
837                address: addr.clone(),
838                tls_config: state.tls_config.clone(),
839                canonical_address: addr.clone(),
840            };
841            clients.insert(node_id, target);
842        }
843
844        clients
845    }
846
847    fn gen_first_http_endpoints(
848        client_name: String,
849        mgmt_addrs: &Vec<Address>,
850        state: &AgentState,
851    ) -> HashMap<String, FirstHttpConfig> {
852        let mut clients = HashMap::new();
853        for addr in mgmt_addrs {
854            let node_id = format!("mgmt{addr}");
855            let base = if state.tls_config.is_some() {
856                "https"
857            } else {
858                "http"
859            };
860            let config = FirstHttpConfig {
861                endpoint: format!("{base}://{addr}"),
862                tls: state.tls_config.clone(),
863                user_agent: client_name.clone(),
864                authenticator: state.authenticator.clone(),
865                bucket_name: state.bucket.clone(),
866            };
867            clients.insert(node_id, config);
868        }
869
870        clients
871    }
872
873    pub(crate) async fn run_with_bucket_feature_check<T, Fut>(
874        &self,
875        feature: BucketFeature,
876        operation: impl FnOnce() -> Fut,
877        message: impl Into<String>,
878    ) -> Result<T>
879    where
880        Fut: std::future::Future<Output = Result<T>>,
881    {
882        let features = self.bucket_features().await?;
883
884        if !features.contains(&feature) {
885            return Err(Error::new_feature_not_available_error(
886                format!("{feature:?}"),
887                message.into(),
888            ));
889        }
890
891        operation().await
892    }
893
894    pub(crate) fn get_bucket_name(&self) -> Option<String> {
895        self.inner.get_bucket_name()
896    }
897}
898
899struct FirstHttpConfig {
900    pub endpoint: String,
901    pub tls: Option<TlsConfig>,
902    pub user_agent: String,
903    pub authenticator: Authenticator,
904    pub bucket_name: Option<String>,
905}
906
907impl Drop for Agent {
908    fn drop(&mut self) {
909        debug!(
910            "Dropping agent {}, {} strong references remain",
911            self.id,
912            Arc::strong_count(&self.inner)
913        );
914    }
915}