Skip to main content

couchbase_core/
agent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::address::Address;
20use crate::auth_mechanism::AuthMechanism;
21use crate::authenticator::Authenticator;
22use crate::cbconfig::TerseConfig;
23use crate::collection_resolver_cached::{
24    CollectionResolverCached, CollectionResolverCachedOptions,
25};
26use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
27use crate::compressionmanager::{CompressionManager, StdCompressor};
28use crate::configmanager::{
29    ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
30};
31use crate::configparser::ConfigParser;
32use crate::crudcomponent::CrudComponent;
33use crate::diagnosticscomponent::{DiagnosticsComponent, DiagnosticsComponentConfig};
34use crate::errmapcomponent::ErrMapComponent;
35use crate::error::{Error, ErrorKind, Result};
36use crate::features::BucketFeature;
37use crate::httpcomponent::HttpComponent;
38use crate::httpx::client::{ClientConfig, ReqwestClient};
39use crate::kvclient::{
40    KvClient, KvClientBootstrapOptions, KvClientOptions, StdKvClient, UnsolicitedPacket,
41};
42use crate::kvclient_ops::KvClientOps;
43use crate::kvclientpool::{KvClientPool, KvClientPoolOptions, StdKvClientPool};
44use crate::memdx::client::Client;
45use crate::memdx::opcode::OpCode;
46use crate::memdx::packet::ResponsePacket;
47use crate::memdx::request::GetClusterConfigRequest;
48use crate::mgmtcomponent::{MgmtComponent, MgmtComponentConfig, MgmtComponentOptions};
49use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions};
50use crate::networktypeheuristic::NetworkTypeHeuristic;
51use crate::nmvbhandler::{ConfigUpdater, StdNotMyVbucketConfigHandler};
52use crate::options::agent::{AgentOptions, ReconfigureAgentOptions};
53use crate::parsedconfig::{ParsedConfig, ParsedConfigBucketFeature, ParsedConfigFeature};
54use crate::querycomponent::{QueryComponent, QueryComponentConfig, QueryComponentOptions};
55use crate::retry::RetryManager;
56use crate::searchcomponent::{SearchComponent, SearchComponentConfig, SearchComponentOptions};
57use crate::service_type::ServiceType;
58use crate::tls_config::TlsConfig;
59use crate::util::{get_host_port_from_uri, get_hostname_from_host_port};
60use crate::vbucketrouter::{
61    StdVbucketRouter, VbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
62};
63use crate::{httpx, mgmtx};
64
65use byteorder::BigEndian;
66use futures::executor::block_on;
67use tracing::{debug, error, info, warn};
68use uuid::Uuid;
69
70use crate::analyticscomponent::{AnalyticsComponent, AnalyticsComponentOptions};
71use crate::componentconfigs::{AgentComponentConfigs, HttpClientConfig};
72use crate::httpx::request::{Auth, BasicAuth, BearerAuth};
73use crate::kvclient_babysitter::{KvTarget, StdKvClientBabysitter};
74use crate::kvendpointclientmanager::{
75    KvEndpointClientManager, KvEndpointClientManagerOptions, StdKvEndpointClientManager,
76};
77use crate::orphan_reporter::OrphanReporter;
78use crate::tracingcomponent::{TracingComponent, TracingComponentConfig};
79use arc_swap::ArcSwap;
80use std::cmp::Ordering;
81use std::collections::HashMap;
82use std::error::Error as StdError;
83use std::fmt::{format, Display};
84use std::io::Cursor;
85use std::net::ToSocketAddrs;
86use std::ops::{Add, Deref};
87use std::sync::{Arc, Weak};
88use std::time::Duration;
89use tokio::io::AsyncReadExt;
90use tokio::net;
91use tokio::runtime::{Handle, Runtime};
92use tokio::sync::broadcast::{Receiver, Sender};
93use tokio::sync::{broadcast, mpsc, Mutex};
94use tokio::task::JoinHandle;
95use tokio::time::{sleep, timeout, timeout_at, Instant};
96
97#[derive(Clone)]
98struct AgentState {
99    bucket: Option<String>,
100    tls_config: Option<TlsConfig>,
101    authenticator: Authenticator,
102    auth_mechanisms: Vec<AuthMechanism>,
103    num_pool_connections: usize,
104    // http_transport:
105    latest_config: ParsedConfig,
106    network_type: String,
107
108    disable_error_map: bool,
109    disable_mutation_tokens: bool,
110    disable_server_durations: bool,
111    kv_connect_timeout: Duration,
112    kv_connect_throttle_timeout: Duration,
113    http_idle_connection_timeout: Duration,
114    http_max_idle_connections_per_host: Option<usize>,
115    tcp_keep_alive_time: Duration,
116}
117
118impl Display for AgentState {
119    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120        write!(
121            f,
122            "{{ bucket: {:?}, network_type: {}, num_pool_connections: {}, latest_config_rev_id: {}, latest_config_rev_epoch: {}, authenticator: {} }}",
123            self.bucket,
124            self.network_type,
125            self.num_pool_connections,
126            self.latest_config.rev_id,
127            self.latest_config.rev_epoch,
128            self.authenticator,
129
130        )
131    }
132}
133
134type AgentClientManager = StdKvEndpointClientManager<
135    StdKvClientPool<StdKvClientBabysitter<StdKvClient<Client>>, StdKvClient<Client>>,
136    StdKvClient<Client>,
137>;
138type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<AgentClientManager>>;
139
140pub(crate) struct AgentInner {
141    state: Arc<Mutex<AgentState>>,
142    bucket: Option<String>,
143
144    cfg_manager: Arc<ConfigManagerMemd<AgentClientManager>>,
145    conn_mgr: Arc<AgentClientManager>,
146    vb_router: Arc<StdVbucketRouter>,
147    collections: Arc<AgentCollectionResolver>,
148    retry_manager: Arc<RetryManager>,
149    http_client: Arc<ReqwestClient>,
150    err_map_component: Arc<ErrMapComponent>,
151
152    pub(crate) crud: CrudComponent<
153        AgentClientManager,
154        StdVbucketRouter,
155        StdNotMyVbucketConfigHandler<AgentInner>,
156        AgentCollectionResolver,
157        StdCompressor,
158    >,
159
160    pub(crate) analytics: Arc<AnalyticsComponent<ReqwestClient>>,
161    pub(crate) query: Arc<QueryComponent<ReqwestClient>>,
162    pub(crate) search: Arc<SearchComponent<ReqwestClient>>,
163    pub(crate) mgmt: MgmtComponent<ReqwestClient>,
164    pub(crate) diagnostics: DiagnosticsComponent<ReqwestClient, AgentClientManager>,
165    pub(crate) tracing: Arc<TracingComponent>,
166}
167
168pub struct Agent {
169    pub(crate) inner: Arc<AgentInner>,
170    user_agent: String,
171    id: String,
172}
173
174impl AgentInner {
175    fn gen_agent_component_configs_locked(state: &AgentState) -> AgentComponentConfigs {
176        AgentComponentConfigs::gen_from_config(
177            &state.latest_config,
178            &state.network_type,
179            state.tls_config.clone(),
180            state.bucket.clone(),
181            state.authenticator.clone(),
182        )
183    }
184
185    pub async fn unsolicited_packet_handler(&self, up: UnsolicitedPacket) {
186        let packet = up.packet;
187        if packet.op_code == OpCode::Set {
188            if let Some(ref extras) = packet.extras {
189                if extras.len() < 16 {
190                    warn!("Received Set packet with too short extras: {packet:?}");
191                    return;
192                }
193
194                let mut cursor = Cursor::new(extras);
195                let server_rev_epoch = cursor.read_i64().await.unwrap();
196                let server_rev_id = cursor.read_i64().await.unwrap();
197
198                if let Some(config) = self
199                    .cfg_manager
200                    .out_of_band_version(server_rev_id, server_rev_epoch, up.endpoint_id)
201                    .await
202                {
203                    self.apply_config(config).await;
204                }
205            } else {
206                warn!("Received Set packet with no extras: {packet:?}");
207            }
208        }
209    }
210
211    pub async fn apply_config(&self, config: ParsedConfig) {
212        let mut state = self.state.lock().await;
213
214        info!(
215            "Agent applying updated config: rev_id={rev_id}, rev_epoch={rev_epoch}",
216            rev_id = config.rev_id,
217            rev_epoch = config.rev_epoch
218        );
219        state.latest_config = config;
220
221        self.update_state_locked(&mut state).await;
222    }
223
224    async fn update_state_locked(&self, state: &mut AgentState) {
225        debug!("Agent updating state {}", state);
226
227        let agent_component_configs = Self::gen_agent_component_configs_locked(state);
228
229        // In order to avoid race conditions between operations selecting the
230        // endpoint they need to send the request to, and fetching an actual
231        // client which can send to that endpoint.  We must first ensure that
232        // all the new endpoints are available in the manager.  Then update
233        // the routing table.  Then go back and remove the old entries from
234        // the connection manager list.
235
236        if let Err(e) = self
237            .conn_mgr
238            .update_endpoints(agent_component_configs.kv_targets.clone(), true)
239            .await
240        {
241            error!("Failed to reconfigure connection manager (add-only); {e}");
242        };
243
244        self.vb_router
245            .update_vbucket_info(agent_component_configs.vbucket_routing_info);
246
247        if let Err(e) = self
248            .cfg_manager
249            .reconfigure(agent_component_configs.config_manager_memd_config)
250        {
251            error!("Failed to reconfigure memd config watcher component; {e}");
252        }
253
254        if let Err(e) = self
255            .conn_mgr
256            .update_endpoints(agent_component_configs.kv_targets, false)
257            .await
258        {
259            error!("Failed to reconfigure connection manager; {e}");
260        }
261
262        self.analytics
263            .reconfigure(agent_component_configs.analytics_config);
264        self.query.reconfigure(agent_component_configs.query_config);
265        self.search
266            .reconfigure(agent_component_configs.search_config);
267        self.mgmt.reconfigure(agent_component_configs.mgmt_config);
268        self.diagnostics
269            .reconfigure(agent_component_configs.diagnostics_config);
270        self.tracing
271            .reconfigure(agent_component_configs.tracing_config);
272    }
273
274    pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
275        let guard = self.state.lock().await;
276
277        if let Some(bucket) = &guard.latest_config.bucket {
278            let mut features = vec![];
279
280            for feature in &bucket.features {
281                match feature {
282                    ParsedConfigBucketFeature::CreateAsDeleted => {
283                        features.push(BucketFeature::CreateAsDeleted)
284                    }
285                    ParsedConfigBucketFeature::ReplaceBodyWithXattr => {
286                        features.push(BucketFeature::ReplaceBodyWithXattr)
287                    }
288                    ParsedConfigBucketFeature::RangeScan => features.push(BucketFeature::RangeScan),
289                    ParsedConfigBucketFeature::ReplicaRead => {
290                        features.push(BucketFeature::ReplicaRead)
291                    }
292                    ParsedConfigBucketFeature::NonDedupedHistory => {
293                        features.push(BucketFeature::NonDedupedHistory)
294                    }
295                    ParsedConfigBucketFeature::ReviveDocument => {
296                        features.push(BucketFeature::ReviveDocument)
297                    }
298                    _ => {}
299                }
300            }
301
302            return Ok(features);
303        }
304
305        Err(ErrorKind::NoBucket.into())
306    }
307
308    pub(crate) fn get_bucket_name(&self) -> Option<String> {
309        self.bucket.clone()
310    }
311
312    pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
313        let mut state = self.state.lock().await;
314        state.tls_config = opts.tls_config.clone();
315        state.authenticator = opts.authenticator.clone();
316
317        // We manually update tls for http as it requires rebuilding the client.
318        match self
319            .http_client
320            .update_tls(httpx::client::UpdateTlsOptions {
321                tls_config: opts.tls_config,
322            }) {
323            Ok(_) => {}
324            Err(e) => {
325                warn!("Failed to update TLS for HTTP client: {}", e);
326            }
327        };
328
329        self.conn_mgr.update_auth(opts.authenticator).await;
330
331        self.update_state_locked(&mut state).await;
332    }
333}
334
335impl ConfigUpdater for AgentInner {
336    async fn apply_terse_config(&self, config: TerseConfig, source_hostname: &str) {
337        let parsed_config = match ConfigParser::parse_terse_config(config, source_hostname) {
338            Ok(cfg) => cfg,
339            Err(_e) => {
340                // TODO: log
341                return;
342            }
343        };
344
345        if let Some(config) = self.cfg_manager.out_of_band_config(parsed_config) {
346            self.apply_config(config).await;
347        };
348    }
349}
350
351impl Agent {
352    pub async fn new(opts: AgentOptions) -> Result<Self> {
353        let build_version = env!("CARGO_PKG_VERSION");
354        let user_agent = format!("cb-rust/{build_version}");
355        let agent_id = Uuid::new_v4().to_string();
356        info!(
357            "Core SDK Version: {} - Agent ID: {}",
358            &user_agent, &agent_id
359        );
360        info!("Agent Options {opts}");
361
362        let auth_mechanisms = if !opts.auth_mechanisms.is_empty() {
363            if opts.tls_config.is_none() && opts.auth_mechanisms.contains(&AuthMechanism::Plain) {
364                warn!("PLAIN sends credentials in plaintext, this will cause credential leakage on the network");
365            } else if opts.tls_config.is_some()
366                && (opts.auth_mechanisms.contains(&AuthMechanism::ScramSha512)
367                    || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha256)
368                    || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha1))
369            {
370                warn!("Consider using PLAIN for TLS connections, as it is more efficient");
371            }
372
373            opts.auth_mechanisms
374        } else {
375            vec![]
376        };
377
378        let mut state = AgentState {
379            bucket: opts.bucket_name.clone(),
380            authenticator: opts.authenticator.clone(),
381            num_pool_connections: opts.kv_config.num_connections,
382            latest_config: ParsedConfig::default(),
383            network_type: "".to_string(),
384            tls_config: opts.tls_config,
385            auth_mechanisms: auth_mechanisms.clone(),
386            disable_error_map: !opts.kv_config.enable_error_map,
387            disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens,
388            disable_server_durations: !opts.kv_config.enable_server_durations,
389            kv_connect_timeout: opts.kv_config.connect_timeout,
390            kv_connect_throttle_timeout: opts.kv_config.connect_throttle_timeout,
391            http_idle_connection_timeout: opts.http_config.idle_connection_timeout,
392            http_max_idle_connections_per_host: opts.http_config.max_idle_connections_per_host,
393            tcp_keep_alive_time: opts
394                .tcp_keep_alive_time
395                .unwrap_or_else(|| Duration::from_secs(60)),
396        };
397
398        let bucket_name = opts.bucket_name.clone();
399
400        let http_client = Arc::new(ReqwestClient::new(ClientConfig {
401            tls_config: state.tls_config.clone(),
402            idle_connection_timeout: state.http_idle_connection_timeout,
403            max_idle_connections_per_host: state.http_max_idle_connections_per_host,
404            tcp_keep_alive_time: state.tcp_keep_alive_time,
405        })?);
406
407        let err_map_component = Arc::new(ErrMapComponent::new());
408
409        let connect_timeout = opts.kv_config.connect_timeout;
410
411        let first_kv_client_configs =
412            Self::gen_first_kv_client_configs(&opts.seed_config.memd_addrs, &state);
413        let first_http_client_configs = Self::gen_first_http_endpoints(
414            user_agent.clone(),
415            &opts.seed_config.http_addrs,
416            &state,
417        );
418        let (first_config, cfg_source_host_port) = Self::get_first_config(
419            user_agent.clone(),
420            first_kv_client_configs,
421            &state,
422            first_http_client_configs,
423            http_client.clone(),
424            err_map_component.clone(),
425            connect_timeout,
426        )
427        .await?;
428
429        state.latest_config = first_config.clone();
430
431        let network_type = if let Some(network) = opts.network {
432            if network == "auto" || network.is_empty() {
433                NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
434            } else {
435                network
436            }
437        } else {
438            NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
439        };
440        info!(
441            "Agent {} identified network type: {network_type}",
442            &agent_id
443        );
444        state.network_type = network_type;
445
446        let agent_component_configs = AgentInner::gen_agent_component_configs_locked(&state);
447
448        let tracing = Arc::new(TracingComponent::new(
449            agent_component_configs.tracing_config,
450        ));
451
452        let err_map_component_conn_mgr = err_map_component.clone();
453
454        let num_pool_connections = state.num_pool_connections;
455
456        let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel();
457
458        let conn_mgr_id = Uuid::new_v4().to_string();
459        info!(
460            "Agent {} creating kv endpoint client manager {}",
461            &agent_id, &conn_mgr_id
462        );
463        let conn_mgr = Arc::new(
464            StdKvEndpointClientManager::new(KvEndpointClientManagerOptions {
465                id: conn_mgr_id,
466                on_close_handler: Arc::new(|_manager_id| {}),
467                on_demand_connect: opts.kv_config.on_demand_connect,
468                num_pool_connections,
469                connect_throttle_period: opts.kv_config.connect_throttle_timeout,
470                bootstrap_options: KvClientBootstrapOptions {
471                    client_name: user_agent.clone(),
472                    disable_error_map: state.disable_error_map,
473                    disable_mutation_tokens: state.disable_mutation_tokens,
474                    disable_server_durations: state.disable_server_durations,
475                    on_err_map_fetched: Some(Arc::new(move |err_map| {
476                        err_map_component_conn_mgr.on_err_map(err_map);
477                    })),
478                    tcp_keep_alive_time: state.tcp_keep_alive_time,
479                    auth_mechanisms,
480                    connect_timeout,
481                },
482                unsolicited_packet_tx: Some(unsolicited_packet_tx),
483                orphan_handler: opts.orphan_response_handler,
484                endpoints: agent_component_configs.kv_targets,
485                authenticator: opts.authenticator,
486                disable_decompression: opts.compression_config.disable_decompression,
487                selected_bucket: opts.bucket_name,
488                tracing: tracing.clone(),
489            })
490            .await?,
491        );
492
493        let cfg_manager = Arc::new(ConfigManagerMemd::new(
494            agent_component_configs.config_manager_memd_config,
495            ConfigManagerMemdOptions {
496                polling_period: opts.config_poller_config.poll_interval,
497                kv_client_manager: conn_mgr.clone(),
498                first_config,
499                fetch_timeout: opts.config_poller_config.fetch_timeout,
500            },
501        ));
502        let vb_router = Arc::new(StdVbucketRouter::new(
503            agent_component_configs.vbucket_routing_info,
504            VbucketRouterOptions {},
505        ));
506
507        let nmvb_handler = Arc::new(StdNotMyVbucketConfigHandler::new());
508
509        let memd_resolver = CollectionResolverMemd::new(CollectionResolverMemdOptions {
510            conn_mgr: conn_mgr.clone(),
511        });
512
513        let collections = Arc::new(CollectionResolverCached::new(
514            CollectionResolverCachedOptions {
515                resolver: memd_resolver,
516            },
517        ));
518
519        let retry_manager = Arc::new(RetryManager::new(err_map_component.clone()));
520        let compression_manager = Arc::new(CompressionManager::new(opts.compression_config));
521
522        let crud = CrudComponent::new(
523            nmvb_handler.clone(),
524            vb_router.clone(),
525            conn_mgr.clone(),
526            collections.clone(),
527            retry_manager.clone(),
528            compression_manager,
529        );
530
531        let mgmt_id = Uuid::new_v4().to_string();
532        info!("Agent {} creating mgmt component {}", &agent_id, &mgmt_id);
533        let mgmt = MgmtComponent::new(
534            retry_manager.clone(),
535            http_client.clone(),
536            tracing.clone(),
537            agent_component_configs.mgmt_config,
538            MgmtComponentOptions {
539                id: mgmt_id,
540                user_agent: user_agent.clone(),
541            },
542        );
543
544        let analytics_id = Uuid::new_v4().to_string();
545        info!(
546            "Agent {} creating analytics component {}",
547            &agent_id, &analytics_id
548        );
549        let analytics = Arc::new(AnalyticsComponent::new(
550            retry_manager.clone(),
551            http_client.clone(),
552            agent_component_configs.analytics_config,
553            AnalyticsComponentOptions {
554                id: analytics_id,
555                user_agent: user_agent.clone(),
556            },
557        ));
558
559        let query_id = Uuid::new_v4().to_string();
560        info!("Agent {} creating query component {}", &agent_id, &query_id);
561        let query = Arc::new(QueryComponent::new(
562            retry_manager.clone(),
563            http_client.clone(),
564            tracing.clone(),
565            agent_component_configs.query_config,
566            QueryComponentOptions {
567                id: query_id,
568                user_agent: user_agent.clone(),
569            },
570        ));
571
572        let search_id = Uuid::new_v4().to_string();
573        info!(
574            "Agent {} creating search component {}",
575            &agent_id, &search_id
576        );
577        let search = Arc::new(SearchComponent::new(
578            retry_manager.clone(),
579            http_client.clone(),
580            tracing.clone(),
581            agent_component_configs.search_config,
582            SearchComponentOptions {
583                id: search_id,
584                user_agent: user_agent.clone(),
585            },
586        ));
587
588        let diagnostics = DiagnosticsComponent::new(
589            conn_mgr.clone(),
590            query.clone(),
591            search.clone(),
592            retry_manager.clone(),
593            agent_component_configs.diagnostics_config,
594        );
595
596        let state = Arc::new(Mutex::new(state));
597
598        let inner = Arc::new(AgentInner {
599            state,
600            bucket: bucket_name,
601            cfg_manager: cfg_manager.clone(),
602            conn_mgr,
603            vb_router,
604            crud,
605            collections,
606            retry_manager,
607            http_client,
608            err_map_component,
609
610            mgmt,
611            analytics,
612            query,
613            search,
614            diagnostics,
615            tracing,
616        });
617
618        let inner_clone = Arc::downgrade(&inner);
619        tokio::spawn(async move {
620            while let Some(packet) = unsolicited_packet_rx.recv().await {
621                if let Some(inner_clone) = inner_clone.upgrade() {
622                    inner_clone.unsolicited_packet_handler(packet).await;
623                } else {
624                    break;
625                }
626            }
627            debug!("Unsolicited packet handler exited");
628        });
629
630        nmvb_handler.set_watcher(Arc::downgrade(&inner)).await;
631
632        Self::start_config_watcher(Arc::downgrade(&inner), cfg_manager);
633
634        let agent = Agent {
635            inner,
636            user_agent,
637            id: agent_id.clone(),
638        };
639
640        info!("Agent {agent_id} created");
641
642        Ok(agent)
643    }
644
645    // reconfigure allows updating certain aspects of the agent at runtime.
646    // Note: toggling TLS on and off is not supported and will result in internal errors.
647    pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
648        self.inner.reconfigure(opts).await
649    }
650
651    fn start_config_watcher(
652        inner: Weak<AgentInner>,
653        config_watcher: Arc<impl ConfigManager>,
654    ) -> JoinHandle<()> {
655        let mut watch_rx = config_watcher.watch();
656
657        let inner = inner.clone();
658        tokio::spawn(async move {
659            loop {
660                match watch_rx.changed().await {
661                    Ok(_) => {
662                        let pc = {
663                            // apply_config requires an owned ParsedConfig, as it takes ownership of it.
664                            // Doing the clone within a block also means we can release the lock that
665                            // borrow_and_update() takes as soon as possible.
666                            watch_rx.borrow_and_update().clone()
667                        };
668                        if let Some(i) = inner.upgrade() {
669                            i.apply_config(pc).await;
670                        } else {
671                            debug!("Config watcher inner dropped, exiting");
672                            return;
673                        }
674                    }
675                    Err(_e) => {
676                        debug!("Config watcher channel closed");
677                        return;
678                    }
679                }
680            }
681        })
682    }
683
684    async fn get_first_config<C: httpx::client::Client>(
685        client_name: String,
686        kv_targets: HashMap<String, KvTarget>,
687        state: &AgentState,
688        http_configs: HashMap<String, FirstHttpConfig>,
689        http_client: Arc<C>,
690        err_map_component: Arc<ErrMapComponent>,
691        connect_timeout: Duration,
692    ) -> Result<(ParsedConfig, String)> {
693        loop {
694            for target in kv_targets.values() {
695                let host = &target.address;
696                let err_map_component_clone = err_map_component.clone();
697                let timeout_result = timeout(
698                    connect_timeout,
699                    StdKvClient::new(KvClientOptions {
700                        address: target.clone(),
701                        authenticator: state.authenticator.clone(),
702                        selected_bucket: state.bucket.clone(),
703                        bootstrap_options: KvClientBootstrapOptions {
704                            client_name: client_name.clone(),
705                            disable_error_map: state.disable_error_map,
706                            disable_mutation_tokens: true,
707                            disable_server_durations: true,
708                            on_err_map_fetched: Some(Arc::new(move |err_map| {
709                                err_map_component_clone.on_err_map(err_map);
710                            })),
711                            tcp_keep_alive_time: state.tcp_keep_alive_time,
712                            auth_mechanisms: state.auth_mechanisms.clone(),
713                            connect_timeout,
714                        },
715                        endpoint_id: "".to_string(),
716                        unsolicited_packet_tx: None,
717                        orphan_handler: None,
718                        on_close_tx: None,
719                        disable_decompression: false,
720                        id: Uuid::new_v4().to_string(),
721                        tracing: Default::default(),
722                    }),
723                )
724                .await;
725
726                let client: StdKvClient<Client> = match timeout_result {
727                    Ok(client_result) => match client_result {
728                        Ok(client) => client,
729                        Err(e) => {
730                            let mut msg = format!("Failed to connect to endpoint: {e}");
731                            if let Some(source) = e.source() {
732                                msg = format!("{msg} - {source}");
733                            }
734                            warn!("{msg}");
735                            continue;
736                        }
737                    },
738                    Err(_e) => continue,
739                };
740
741                let raw_config = match client
742                    .get_cluster_config(GetClusterConfigRequest {
743                        known_version: None,
744                    })
745                    .await
746                {
747                    Ok(resp) => resp.config,
748                    Err(_e) => continue,
749                };
750
751                client.close().await?;
752
753                let config: TerseConfig = serde_json::from_slice(&raw_config).map_err(|e| {
754                    Error::new_message_error(format!("failed to deserialize config: {e}"))
755                })?;
756
757                match ConfigParser::parse_terse_config(config, host.host.as_str()) {
758                    Ok(c) => {
759                        return Ok((c, format!("{}:{}", host.host, host.port)));
760                    }
761                    Err(_e) => continue,
762                };
763            }
764
765            info!("Failed to fetch config over kv, attempting http");
766            for endpoint_config in http_configs.values() {
767                let endpoint = endpoint_config.endpoint.clone();
768                let host_port = get_host_port_from_uri(&endpoint)?;
769                let auth = match &endpoint_config.authenticator {
770                    Authenticator::PasswordAuthenticator(authenticator) => {
771                        let user_pass =
772                            authenticator.get_credentials(&ServiceType::MGMT, host_port.clone())?;
773                        Auth::BasicAuth(BasicAuth::new(user_pass.username, user_pass.password))
774                    }
775                    Authenticator::CertificateAuthenticator(_authenticator) => {
776                        Auth::BasicAuth(BasicAuth::new("".to_string(), "".to_string()))
777                    }
778                    Authenticator::JwtAuthenticator(authenticator) => {
779                        Auth::BearerAuth(BearerAuth::new(authenticator.get_token()))
780                    }
781                };
782
783                match Self::fetch_http_config(
784                    http_client.clone(),
785                    endpoint,
786                    endpoint_config.user_agent.clone(),
787                    auth,
788                    endpoint_config.bucket_name.clone(),
789                )
790                .await
791                {
792                    Ok(c) => {
793                        return Ok((c, host_port));
794                    }
795                    Err(_e) => {}
796                };
797            }
798
799            info!("Failed to fetch config from any source");
800
801            // TODO: Make configurable?
802            sleep(Duration::from_secs(1)).await;
803        }
804    }
805
806    pub(crate) async fn fetch_http_config<C: httpx::client::Client>(
807        http_client: Arc<C>,
808        endpoint: String,
809        user_agent: String,
810        auth: Auth,
811        bucket_name: Option<String>,
812    ) -> Result<ParsedConfig> {
813        debug!("Polling config from {}", &endpoint);
814
815        let host_port = get_host_port_from_uri(&endpoint)?;
816        let hostname = get_hostname_from_host_port(&host_port)?;
817
818        let parsed = if let Some(bucket_name) = bucket_name {
819            let config = mgmtx::mgmt::Management {
820                http_client,
821                user_agent,
822                endpoint: endpoint.clone(),
823                canonical_endpoint: endpoint.clone(),
824                auth,
825                tracing: Default::default(),
826            }
827            .get_terse_bucket_config(&GetTerseBucketConfigOptions {
828                bucket_name: &bucket_name,
829                on_behalf_of_info: None,
830            })
831            .await
832            .map_err(Error::from)?;
833
834            ConfigParser::parse_terse_config(config, &hostname)?
835        } else {
836            let config = mgmtx::mgmt::Management {
837                http_client,
838                user_agent,
839                endpoint: endpoint.clone(),
840                canonical_endpoint: endpoint.clone(),
841                auth,
842                tracing: Default::default(),
843            }
844            .get_terse_cluster_config(&GetTerseClusterConfigOptions {
845                on_behalf_of_info: None,
846            })
847            .await
848            .map_err(Error::from)?;
849
850            ConfigParser::parse_terse_config(config, &hostname)?
851        };
852
853        Ok(parsed)
854    }
855
856    fn gen_first_kv_client_configs(
857        memd_addrs: &Vec<Address>,
858        state: &AgentState,
859    ) -> HashMap<String, KvTarget> {
860        let mut clients = HashMap::new();
861        for addr in memd_addrs {
862            let node_id = format!("kv-{addr}");
863            let target = KvTarget {
864                address: addr.clone(),
865                tls_config: state.tls_config.clone(),
866                canonical_address: addr.clone(),
867            };
868            clients.insert(node_id, target);
869        }
870
871        clients
872    }
873
874    fn gen_first_http_endpoints(
875        client_name: String,
876        mgmt_addrs: &Vec<Address>,
877        state: &AgentState,
878    ) -> HashMap<String, FirstHttpConfig> {
879        let mut clients = HashMap::new();
880        for addr in mgmt_addrs {
881            let node_id = format!("mgmt{addr}");
882            let base = if state.tls_config.is_some() {
883                "https"
884            } else {
885                "http"
886            };
887            let config = FirstHttpConfig {
888                endpoint: format!("{base}://{addr}"),
889                tls: state.tls_config.clone(),
890                user_agent: client_name.clone(),
891                authenticator: state.authenticator.clone(),
892                bucket_name: state.bucket.clone(),
893            };
894            clients.insert(node_id, config);
895        }
896
897        clients
898    }
899
900    pub(crate) async fn run_with_bucket_feature_check<T, Fut>(
901        &self,
902        feature: BucketFeature,
903        operation: impl FnOnce() -> Fut,
904        message: impl Into<String>,
905    ) -> Result<T>
906    where
907        Fut: std::future::Future<Output = Result<T>>,
908    {
909        let features = self.bucket_features().await?;
910
911        if !features.contains(&feature) {
912            return Err(Error::new_feature_not_available_error(
913                format!("{feature:?}"),
914                message.into(),
915            ));
916        }
917
918        operation().await
919    }
920
921    pub(crate) fn get_bucket_name(&self) -> Option<String> {
922        self.inner.get_bucket_name()
923    }
924}
925
926struct FirstHttpConfig {
927    pub endpoint: String,
928    pub tls: Option<TlsConfig>,
929    pub user_agent: String,
930    pub authenticator: Authenticator,
931    pub bucket_name: Option<String>,
932}
933
934impl Drop for Agent {
935    fn drop(&mut self) {
936        debug!(
937            "Dropping agent {}, {} strong references remain",
938            self.id,
939            Arc::strong_count(&self.inner)
940        );
941    }
942}