couchbase_core/
agent.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::address::Address;
20use crate::auth_mechanism::AuthMechanism;
21use crate::authenticator::Authenticator;
22use crate::cbconfig::TerseConfig;
23use crate::collection_resolver_cached::{
24    CollectionResolverCached, CollectionResolverCachedOptions,
25};
26use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
27use crate::compressionmanager::{CompressionManager, StdCompressor};
28use crate::configmanager::{
29    ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
30};
31use crate::configparser::ConfigParser;
32use crate::crudcomponent::CrudComponent;
33use crate::diagnosticscomponent::{DiagnosticsComponent, DiagnosticsComponentConfig};
34use crate::errmapcomponent::ErrMapComponent;
35use crate::error::{Error, ErrorKind, Result};
36use crate::features::BucketFeature;
37use crate::httpcomponent::HttpComponent;
38use crate::httpx::client::{ClientConfig, ReqwestClient};
39use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions, StdKvClient};
40use crate::kvclient_ops::KvClientOps;
41use crate::kvclientmanager::{
42    KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager,
43};
44use crate::kvclientpool::{
45    KvClientPool, KvClientPoolConfig, KvClientPoolOptions, NaiveKvClientPool,
46};
47use crate::memdx::client::Client;
48use crate::memdx::opcode::OpCode;
49use crate::memdx::packet::ResponsePacket;
50use crate::memdx::request::GetClusterConfigRequest;
51use crate::mgmtcomponent::{MgmtComponent, MgmtComponentConfig, MgmtComponentOptions};
52use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions};
53use crate::networktypeheuristic::NetworkTypeHeuristic;
54use crate::nmvbhandler::{ConfigUpdater, StdNotMyVbucketConfigHandler};
55use crate::options::agent::AgentOptions;
56use crate::parsedconfig::{ParsedConfig, ParsedConfigBucketFeature, ParsedConfigFeature};
57use crate::querycomponent::{QueryComponent, QueryComponentConfig, QueryComponentOptions};
58use crate::retry::RetryManager;
59use crate::searchcomponent::{SearchComponent, SearchComponentConfig, SearchComponentOptions};
60use crate::service_type::ServiceType;
61use crate::tls_config::TlsConfig;
62use crate::util::{get_host_port_from_uri, get_hostname_from_host_port};
63use crate::vbucketrouter::{
64    StdVbucketRouter, VbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
65};
66use crate::{httpx, mgmtx};
67
68use byteorder::BigEndian;
69use futures::executor::block_on;
70use log::{debug, error, info, warn};
71use uuid::Uuid;
72
73use std::cmp::Ordering;
74use std::collections::HashMap;
75use std::error::Error as StdError;
76use std::fmt::format;
77use std::io::Cursor;
78use std::net::ToSocketAddrs;
79use std::ops::Add;
80use std::sync::{Arc, Weak};
81use std::time::Duration;
82
83use crate::orphan_reporter::OrphanReporter;
84use tokio::io::AsyncReadExt;
85use tokio::net;
86use tokio::runtime::{Handle, Runtime};
87use tokio::sync::broadcast::{Receiver, Sender};
88use tokio::sync::{broadcast, mpsc, Mutex};
89use tokio::task::JoinHandle;
90use tokio::time::{sleep, timeout, timeout_at, Instant};
91
92#[derive(Clone)]
93struct AgentState {
94    bucket: Option<String>,
95    tls_config: Option<TlsConfig>,
96    authenticator: Arc<Authenticator>,
97    auth_mechanisms: Vec<AuthMechanism>,
98    num_pool_connections: usize,
99    // http_transport:
100    last_clients: HashMap<String, KvClientConfig>,
101    latest_config: ParsedConfig,
102    network_type: String,
103
104    disable_mutation_tokens: bool,
105    disable_server_durations: bool,
106    kv_connect_timeout: Duration,
107    kv_connect_throttle_timeout: Duration,
108    http_idle_connection_timeout: Duration,
109    http_max_idle_connections_per_host: Option<usize>,
110    tcp_keep_alive_time: Duration,
111
112    client_name: String,
113}
114
115type AgentClientManager = StdKvClientManager<NaiveKvClientPool<StdKvClient<Client>>>;
116type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<AgentClientManager>>;
117
118pub(crate) struct AgentInner {
119    state: Arc<Mutex<AgentState>>,
120
121    cfg_manager: Arc<ConfigManagerMemd<AgentClientManager>>,
122    conn_mgr: Arc<AgentClientManager>,
123    vb_router: Arc<StdVbucketRouter>,
124    collections: Arc<AgentCollectionResolver>,
125    retry_manager: Arc<RetryManager>,
126    http_client: Arc<ReqwestClient>,
127    err_map_component: Arc<ErrMapComponent>,
128
129    pub(crate) crud: CrudComponent<
130        AgentClientManager,
131        StdVbucketRouter,
132        StdNotMyVbucketConfigHandler<AgentInner>,
133        AgentCollectionResolver,
134        StdCompressor,
135    >,
136
137    pub(crate) query: Arc<QueryComponent<ReqwestClient>>,
138    pub(crate) search: Arc<SearchComponent<ReqwestClient>>,
139    pub(crate) mgmt: MgmtComponent<ReqwestClient>,
140    pub(crate) diagnostics: DiagnosticsComponent<ReqwestClient, AgentClientManager>,
141}
142
143pub struct Agent {
144    pub(crate) inner: Arc<AgentInner>,
145}
146
147struct AgentComponentConfigs {
148    pub config_manager_memd_config: ConfigManagerMemdConfig,
149    pub kv_client_manager_client_configs: HashMap<String, KvClientConfig>,
150    pub vbucket_routing_info: VbucketRoutingInfo,
151    pub query_config: QueryComponentConfig,
152    pub search_config: SearchComponentConfig,
153    pub mgmt_config: MgmtComponentConfig,
154    pub diagnostics_config: DiagnosticsComponentConfig,
155    pub http_client_config: ClientConfig,
156}
157
158impl AgentInner {
159    pub async fn unsolicited_packet_handler(&self, packet: ResponsePacket) {
160        if packet.op_code == OpCode::Set {
161            if let Some(ref extras) = packet.extras {
162                if extras.len() < 16 {
163                    warn!("Received Set packet with too short extras: {packet:?}");
164                    return;
165                }
166
167                let mut cursor = Cursor::new(extras);
168                let server_rev_epoch = cursor.read_i64().await.unwrap();
169                let server_rev_id = cursor.read_i64().await.unwrap();
170
171                if let Some(config) = self
172                    .cfg_manager
173                    .out_of_band_version(server_rev_id, server_rev_epoch)
174                    .await
175                {
176                    self.apply_config(config).await;
177                }
178            } else {
179                warn!("Received Set packet with no extras: {packet:?}");
180            }
181        }
182    }
183
184    pub async fn apply_config(&self, config: ParsedConfig) {
185        let mut state = self.state.lock().await;
186
187        info!(
188            "Agent applying updated config: rev_id={rev_id}, rev_epoch={rev_epoch}",
189            rev_id = config.rev_id,
190            rev_epoch = config.rev_epoch
191        );
192        state.latest_config = config;
193
194        self.update_state(&mut state).await;
195    }
196
197    async fn update_state(&self, state: &mut AgentState) {
198        debug!("Agent updating state");
199
200        let agent_component_configs = Self::gen_agent_component_configs(state);
201
202        // In order to avoid race conditions between operations selecting the
203        // endpoint they need to send the request to, and fetching an actual
204        // client which can send to that endpoint.  We must first ensure that
205        // all the new endpoints are available in the manager.  Then update
206        // the routing table.  Then go back and remove the old entries from
207        // the connection manager list.
208
209        let mut old_clients = HashMap::new();
210        for (client_name, client) in &state.last_clients {
211            old_clients.insert(client_name.clone(), client.clone());
212        }
213
214        for (client_name, client) in &agent_component_configs.kv_client_manager_client_configs {
215            old_clients
216                .entry(client_name.clone())
217                .or_insert(client.clone());
218        }
219
220        if let Err(e) = self
221            .conn_mgr
222            .reconfigure(KvClientManagerConfig {
223                num_pool_connections: state.num_pool_connections,
224                clients: old_clients,
225            })
226            .await
227        {
228            error!("Failed to reconfigure connection manager (old clients); {e}");
229        };
230
231        self.vb_router
232            .update_vbucket_info(agent_component_configs.vbucket_routing_info);
233
234        if let Err(e) = self
235            .cfg_manager
236            .reconfigure(agent_component_configs.config_manager_memd_config)
237        {
238            error!("Failed to reconfigure memd config watcher component; {e}");
239        }
240
241        if let Err(e) = self
242            .conn_mgr
243            .reconfigure(KvClientManagerConfig {
244                num_pool_connections: state.num_pool_connections,
245                clients: agent_component_configs.kv_client_manager_client_configs,
246            })
247            .await
248        {
249            error!("Failed to reconfigure connection manager (updated clients); {e}");
250        }
251
252        if let Err(e) = self
253            .http_client
254            .reconfigure(agent_component_configs.http_client_config)
255        {
256            error!("Failed to reconfigure http client: {e}");
257        }
258
259        self.query.reconfigure(agent_component_configs.query_config);
260        self.search
261            .reconfigure(agent_component_configs.search_config);
262        self.mgmt.reconfigure(agent_component_configs.mgmt_config);
263        self.diagnostics
264            .reconfigure(agent_component_configs.diagnostics_config);
265    }
266
267    fn gen_agent_component_configs(state: &mut AgentState) -> AgentComponentConfigs {
268        let config = &state.latest_config;
269        let rev_id = config.rev_id;
270        let network_info = config.addresses_group_for_network_type(&state.network_type);
271
272        let mut gcccp_node_ids = Vec::new();
273        let mut kv_data_node_ids = Vec::new();
274        let mut kv_data_hosts: HashMap<String, Address> = HashMap::new();
275        let mut mgmt_endpoints: HashMap<String, String> = HashMap::new();
276        let mut query_endpoints: HashMap<String, String> = HashMap::new();
277        let mut search_endpoints: HashMap<String, String> = HashMap::new();
278
279        for node in network_info.nodes {
280            let kv_ep_id = format!("kv{}", node.node_id);
281            let mgmt_ep_id = format!("mgmt{}", node.node_id);
282            let query_ep_id = format!("query{}", node.node_id);
283            let search_ep_id = format!("search{}", node.node_id);
284
285            gcccp_node_ids.push(kv_ep_id.clone());
286
287            if node.has_data {
288                kv_data_node_ids.push(kv_ep_id.clone());
289            }
290
291            if state.tls_config.is_some() {
292                if let Some(p) = node.ssl_ports.kv {
293                    kv_data_hosts.insert(
294                        kv_ep_id,
295                        Address {
296                            host: node.hostname.clone(),
297                            port: p,
298                        },
299                    );
300                }
301                if let Some(p) = node.ssl_ports.mgmt {
302                    mgmt_endpoints.insert(mgmt_ep_id, format!("https://{}:{}", node.hostname, p));
303                }
304                if let Some(p) = node.ssl_ports.query {
305                    query_endpoints.insert(query_ep_id, format!("https://{}:{}", node.hostname, p));
306                }
307                if let Some(p) = node.ssl_ports.search {
308                    search_endpoints
309                        .insert(search_ep_id, format!("https://{}:{}", node.hostname, p));
310                }
311            } else {
312                if let Some(p) = node.non_ssl_ports.kv {
313                    kv_data_hosts.insert(
314                        kv_ep_id,
315                        Address {
316                            host: node.hostname.clone(),
317                            port: p,
318                        },
319                    );
320                }
321                if let Some(p) = node.non_ssl_ports.mgmt {
322                    mgmt_endpoints.insert(mgmt_ep_id, format!("http://{}:{}", node.hostname, p));
323                }
324                if let Some(p) = node.non_ssl_ports.query {
325                    query_endpoints.insert(query_ep_id, format!("http://{}:{}", node.hostname, p));
326                }
327                if let Some(p) = node.non_ssl_ports.search {
328                    search_endpoints
329                        .insert(search_ep_id, format!("http://{}:{}", node.hostname, p));
330                }
331            }
332        }
333
334        let mut clients = HashMap::new();
335        for (node_id, address) in kv_data_hosts {
336            let config = KvClientConfig {
337                address,
338                tls: state.tls_config.clone(),
339                client_name: state.client_name.clone(),
340                authenticator: state.authenticator.clone(),
341                selected_bucket: state.bucket.clone(),
342                disable_error_map: false,
343                auth_mechanisms: state.auth_mechanisms.clone(),
344                disable_mutation_tokens: state.disable_mutation_tokens,
345                disable_server_durations: state.disable_server_durations,
346                connect_timeout: state.kv_connect_timeout,
347                tcp_keep_alive_time: state.tcp_keep_alive_time,
348            };
349            clients.insert(node_id, config);
350        }
351
352        let vbucket_routing_info = if let Some(info) = &state.latest_config.bucket {
353            VbucketRoutingInfo {
354                vbucket_info: info.vbucket_map.clone(),
355                server_list: kv_data_node_ids,
356                bucket_selected: state.bucket.is_some(),
357            }
358        } else {
359            VbucketRoutingInfo {
360                vbucket_info: None,
361                server_list: kv_data_node_ids,
362                bucket_selected: state.bucket.is_some(),
363            }
364        };
365
366        let mut available_services = vec![ServiceType::MEMD];
367        if !query_endpoints.is_empty() {
368            available_services.push(ServiceType::QUERY)
369        }
370        if !search_endpoints.is_empty() {
371            available_services.push(ServiceType::SEARCH)
372        }
373
374        AgentComponentConfigs {
375            config_manager_memd_config: ConfigManagerMemdConfig {
376                endpoints: gcccp_node_ids,
377            },
378            kv_client_manager_client_configs: clients,
379            vbucket_routing_info,
380            query_config: QueryComponentConfig {
381                endpoints: query_endpoints,
382                authenticator: state.authenticator.clone(),
383            },
384            search_config: SearchComponentConfig {
385                endpoints: search_endpoints,
386                authenticator: state.authenticator.clone(),
387                vector_search_enabled: state
388                    .latest_config
389                    .features
390                    .contains(&ParsedConfigFeature::FtsVectorSearch),
391            },
392            http_client_config: ClientConfig {
393                tls_config: state.tls_config.clone(),
394                idle_connection_timeout: state.http_idle_connection_timeout,
395                max_idle_connections_per_host: state.http_max_idle_connections_per_host,
396                tcp_keep_alive_time: state.tcp_keep_alive_time,
397            },
398            mgmt_config: MgmtComponentConfig {
399                endpoints: mgmt_endpoints,
400                authenticator: state.authenticator.clone(),
401            },
402            diagnostics_config: DiagnosticsComponentConfig {
403                bucket: state.bucket.clone(),
404                services: available_services,
405                rev_id,
406            },
407        }
408    }
409
410    // TODO: This really shouldn't be async
411    pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
412        let guard = self.state.lock().await;
413
414        if let Some(bucket) = &guard.latest_config.bucket {
415            let mut features = vec![];
416
417            for feature in &bucket.features {
418                match feature {
419                    ParsedConfigBucketFeature::CreateAsDeleted => {
420                        features.push(BucketFeature::CreateAsDeleted)
421                    }
422                    ParsedConfigBucketFeature::ReplaceBodyWithXattr => {
423                        features.push(BucketFeature::ReplaceBodyWithXattr)
424                    }
425                    ParsedConfigBucketFeature::RangeScan => features.push(BucketFeature::RangeScan),
426                    ParsedConfigBucketFeature::ReplicaRead => {
427                        features.push(BucketFeature::ReplicaRead)
428                    }
429                    ParsedConfigBucketFeature::NonDedupedHistory => {
430                        features.push(BucketFeature::NonDedupedHistory)
431                    }
432                    ParsedConfigBucketFeature::ReviveDocument => {
433                        features.push(BucketFeature::ReviveDocument)
434                    }
435                    _ => {}
436                }
437            }
438
439            return Ok(features);
440        }
441
442        Err(ErrorKind::NoBucket.into())
443    }
444}
445
446impl ConfigUpdater for AgentInner {
447    async fn apply_terse_config(&self, config: TerseConfig, source_hostname: &str) {
448        let parsed_config = match ConfigParser::parse_terse_config(config, source_hostname) {
449            Ok(cfg) => cfg,
450            Err(_e) => {
451                // TODO: log
452                return;
453            }
454        };
455
456        if let Some(config) = self.cfg_manager.out_of_band_config(parsed_config) {
457            self.apply_config(config).await;
458        };
459    }
460}
461
462impl Agent {
463    pub async fn new(opts: AgentOptions) -> Result<Self> {
464        let build_version = env!("CARGO_PKG_VERSION");
465        let client_name = format!("couchbase-rs-core {build_version}");
466        info!("Creating new agent {client_name}");
467
468        let auth_mechanisms = if opts.auth_mechanisms.is_empty() {
469            if opts.tls_config.is_some() {
470                vec![AuthMechanism::Plain]
471            } else {
472                vec![
473                    AuthMechanism::ScramSha512,
474                    AuthMechanism::ScramSha256,
475                    AuthMechanism::ScramSha1,
476                ]
477            }
478        } else {
479            if opts.tls_config.is_none() && opts.auth_mechanisms.contains(&AuthMechanism::Plain) {
480                warn!("PLAIN sends credentials in plaintext, this will cause credential leakage on the network");
481            } else if opts.tls_config.is_some()
482                && (opts.auth_mechanisms.contains(&AuthMechanism::ScramSha512)
483                    || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha256)
484                    || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha1))
485            {
486                warn!("Consider using PLAIN for TLS connections, as it is more efficient");
487            }
488
489            opts.auth_mechanisms
490        };
491
492        let mut state = AgentState {
493            bucket: opts.bucket_name.clone(),
494            authenticator: Arc::new(opts.authenticator),
495            num_pool_connections: opts.kv_config.num_connections,
496            last_clients: Default::default(),
497            latest_config: ParsedConfig::default(),
498            network_type: "".to_string(),
499            client_name: client_name.clone(),
500            tls_config: opts.tls_config,
501            auth_mechanisms,
502            disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens,
503            disable_server_durations: !opts.kv_config.enable_server_durations,
504            kv_connect_timeout: opts.kv_config.connect_timeout,
505            kv_connect_throttle_timeout: opts.kv_config.connect_throttle_timeout,
506            http_idle_connection_timeout: opts.http_config.idle_connection_timeout,
507            http_max_idle_connections_per_host: opts.http_config.max_idle_connections_per_host,
508            tcp_keep_alive_time: opts
509                .tcp_keep_alive_time
510                .unwrap_or_else(|| Duration::from_secs(60)),
511        };
512
513        let http_client = Arc::new(ReqwestClient::new(ClientConfig {
514            tls_config: state.tls_config.clone(),
515            idle_connection_timeout: state.http_idle_connection_timeout,
516            max_idle_connections_per_host: state.http_max_idle_connections_per_host,
517            tcp_keep_alive_time: state.tcp_keep_alive_time,
518        })?);
519
520        let err_map_component = Arc::new(ErrMapComponent::new());
521
522        let connect_timeout = opts.kv_config.connect_timeout;
523
524        let first_kv_client_configs =
525            Self::gen_first_kv_client_configs(&opts.seed_config.memd_addrs, &state);
526        let first_http_client_configs =
527            Self::gen_first_http_endpoints(&opts.seed_config.http_addrs, &state);
528        let first_config = Self::get_first_config(
529            first_kv_client_configs,
530            first_http_client_configs,
531            http_client.clone(),
532            err_map_component.clone(),
533            connect_timeout,
534        )
535        .await?;
536
537        state.latest_config = first_config.clone();
538
539        let network_type = NetworkTypeHeuristic::identify(&state.latest_config);
540        state.network_type = network_type;
541
542        let agent_component_configs = AgentInner::gen_agent_component_configs(&mut state);
543
544        let err_map_component_conn_mgr = err_map_component.clone();
545
546        let num_pool_connections = state.num_pool_connections;
547        let state = Arc::new(Mutex::new(state));
548
549        let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel();
550
551        let conn_mgr = Arc::new(
552            StdKvClientManager::new(
553                KvClientManagerConfig {
554                    num_pool_connections,
555                    clients: agent_component_configs.kv_client_manager_client_configs,
556                },
557                KvClientManagerOptions {
558                    connect_timeout,
559                    connect_throttle_period: opts.kv_config.connect_throttle_timeout,
560                    unsolicited_packet_tx: Some(unsolicited_packet_tx),
561                    orphan_handler: opts.orphan_response_handler,
562                    on_err_map_fetched_handler: Some(Arc::new(move |err_map| {
563                        err_map_component_conn_mgr.on_err_map(err_map);
564                    })),
565                    disable_decompression: opts.compression_config.disable_decompression,
566                },
567            )
568            .await?,
569        );
570
571        let cfg_manager = Arc::new(ConfigManagerMemd::new(
572            agent_component_configs.config_manager_memd_config,
573            ConfigManagerMemdOptions {
574                polling_period: opts.config_poller_config.poll_interval,
575                kv_client_manager: conn_mgr.clone(),
576                first_config,
577            },
578        ));
579        let vb_router = Arc::new(StdVbucketRouter::new(
580            agent_component_configs.vbucket_routing_info,
581            VbucketRouterOptions {},
582        ));
583
584        let nmvb_handler = Arc::new(StdNotMyVbucketConfigHandler::new());
585
586        let memd_resolver = CollectionResolverMemd::new(CollectionResolverMemdOptions {
587            conn_mgr: conn_mgr.clone(),
588        });
589
590        let collections = Arc::new(CollectionResolverCached::new(
591            CollectionResolverCachedOptions {
592                resolver: memd_resolver,
593            },
594        ));
595
596        let retry_manager = Arc::new(RetryManager::new(err_map_component.clone()));
597        let compression_manager = Arc::new(CompressionManager::new(opts.compression_config));
598
599        let crud = CrudComponent::new(
600            nmvb_handler.clone(),
601            vb_router.clone(),
602            conn_mgr.clone(),
603            collections.clone(),
604            retry_manager.clone(),
605            compression_manager,
606        );
607
608        let mgmt = MgmtComponent::new(
609            retry_manager.clone(),
610            http_client.clone(),
611            agent_component_configs.mgmt_config,
612            MgmtComponentOptions {
613                user_agent: client_name.clone(),
614            },
615        );
616
617        let query = Arc::new(QueryComponent::new(
618            retry_manager.clone(),
619            http_client.clone(),
620            agent_component_configs.query_config,
621            QueryComponentOptions {
622                user_agent: client_name.clone(),
623            },
624        ));
625
626        let search = Arc::new(SearchComponent::new(
627            retry_manager.clone(),
628            http_client.clone(),
629            agent_component_configs.search_config,
630            SearchComponentOptions {
631                user_agent: client_name.clone(),
632            },
633        ));
634
635        let diagnostics = DiagnosticsComponent::new(
636            conn_mgr.clone(),
637            query.clone(),
638            search.clone(),
639            retry_manager.clone(),
640            agent_component_configs.diagnostics_config,
641        );
642
643        let inner = Arc::new(AgentInner {
644            state,
645            cfg_manager: cfg_manager.clone(),
646            conn_mgr,
647            vb_router,
648            crud,
649            collections,
650            retry_manager,
651            http_client,
652            err_map_component,
653
654            mgmt,
655            query,
656            search,
657            diagnostics,
658        });
659
660        let inner_clone = Arc::downgrade(&inner);
661        tokio::spawn(async move {
662            while let Some(packet) = unsolicited_packet_rx.recv().await {
663                if let Some(inner_clone) = inner_clone.upgrade() {
664                    inner_clone.unsolicited_packet_handler(packet).await;
665                } else {
666                    break;
667                }
668            }
669            debug!("Unsolicited packet handler exited");
670        });
671
672        nmvb_handler.set_watcher(Arc::downgrade(&inner)).await;
673
674        Self::start_config_watcher(Arc::downgrade(&inner), cfg_manager);
675
676        let agent = Agent { inner };
677
678        debug!(
679            "Agent created, {} strong references",
680            Arc::strong_count(&agent.inner)
681        );
682
683        Ok(agent)
684    }
685
686    fn start_config_watcher(
687        inner: Weak<AgentInner>,
688        config_watcher: Arc<impl ConfigManager>,
689    ) -> JoinHandle<()> {
690        let mut watch_rx = config_watcher.watch();
691
692        let inner = inner.clone();
693        tokio::spawn(async move {
694            loop {
695                match watch_rx.changed().await {
696                    Ok(_) => {
697                        let pc = {
698                            // apply_config requires an owned ParsedConfig, as it takes ownership of it.
699                            // Doing the clone within a block also means we can release the lock that
700                            // borrow_and_update() takes as soon as possible.
701                            watch_rx.borrow_and_update().clone()
702                        };
703                        if let Some(i) = inner.upgrade() {
704                            i.apply_config(pc).await;
705                        } else {
706                            debug!("Config watcher inner dropped, exiting");
707                            return;
708                        }
709                    }
710                    Err(_e) => {
711                        debug!("Config watcher channel closed");
712                        return;
713                    }
714                }
715            }
716        })
717    }
718
719    async fn get_first_config<C: httpx::client::Client>(
720        kv_client_manager_client_configs: HashMap<String, KvClientConfig>,
721        http_configs: HashMap<String, FirstHttpConfig>,
722        http_client: Arc<C>,
723        err_map_component: Arc<ErrMapComponent>,
724        connect_timeout: Duration,
725    ) -> Result<ParsedConfig> {
726        loop {
727            for endpoint_config in kv_client_manager_client_configs.values() {
728                let host = &endpoint_config.address;
729                let err_map_component_clone = err_map_component.clone();
730                let timeout_result = timeout(
731                    connect_timeout,
732                    StdKvClient::new(
733                        endpoint_config.clone(),
734                        KvClientOptions {
735                            unsolicited_packet_tx: None,
736                            orphan_handler: None,
737                            on_close: Arc::new(|id| {
738                                Box::pin(async move {
739                                    debug!("Bootstrap client {id} closed");
740                                })
741                            }),
742                            on_err_map_fetched: Some(Arc::new(move |err_map| {
743                                err_map_component_clone.on_err_map(err_map);
744                            })),
745                            disable_decompression: false,
746                            id: Uuid::new_v4().to_string(),
747                        },
748                    ),
749                )
750                .await;
751
752                let client: StdKvClient<Client> = match timeout_result {
753                    Ok(client_result) => match client_result {
754                        Ok(client) => client,
755                        Err(e) => {
756                            let mut msg = format!("Failed to connect to endpoint: {e}");
757                            if let Some(source) = e.source() {
758                                msg = format!("{msg} - {source}");
759                            }
760                            warn!("{msg}");
761                            continue;
762                        }
763                    },
764                    Err(_e) => continue,
765                };
766
767                let raw_config = match client
768                    .get_cluster_config(GetClusterConfigRequest {
769                        known_version: None,
770                    })
771                    .await
772                {
773                    Ok(resp) => resp.config,
774                    Err(_e) => continue,
775                };
776
777                client.close().await?;
778
779                let config: TerseConfig =
780                    serde_json::from_slice(raw_config.as_slice()).map_err(|e| {
781                        Error::new_message_error(format!("failed to deserialize config: {e}"))
782                    })?;
783
784                match ConfigParser::parse_terse_config(config, host.host.as_str()) {
785                    Ok(c) => {
786                        return Ok(c);
787                    }
788                    Err(_e) => continue,
789                };
790            }
791
792            info!("Failed to fetch config over kv, attempting http");
793            for endpoint_config in http_configs.values() {
794                let endpoint = endpoint_config.endpoint.clone();
795                let host = get_host_port_from_uri(&endpoint)?;
796                let user_pass = match endpoint_config.authenticator.as_ref() {
797                    Authenticator::PasswordAuthenticator(authenticator) => {
798                        authenticator.get_credentials(&ServiceType::MGMT, host)?
799                    }
800                    Authenticator::CertificateAuthenticator(authenticator) => {
801                        authenticator.get_credentials(&ServiceType::MGMT, host)?
802                    }
803                };
804
805                match Self::fetch_http_config(
806                    http_client.clone(),
807                    endpoint,
808                    endpoint_config.user_agent.clone(),
809                    user_pass.username,
810                    user_pass.password,
811                    endpoint_config.bucket_name.clone(),
812                )
813                .await
814                {
815                    Ok(c) => {
816                        return Ok(c);
817                    }
818                    Err(_e) => {}
819                };
820            }
821
822            info!("Failed to fetch config from any source");
823
824            // TODO: Make configurable?
825            sleep(Duration::from_secs(1)).await;
826        }
827    }
828
829    pub(crate) async fn fetch_http_config<C: httpx::client::Client>(
830        http_client: Arc<C>,
831        endpoint: String,
832        user_agent: String,
833        username: String,
834        password: String,
835        bucket_name: Option<String>,
836    ) -> Result<ParsedConfig> {
837        debug!("Polling config from {}", &endpoint);
838
839        let host_port = get_host_port_from_uri(&endpoint)?;
840        let hostname = get_hostname_from_host_port(&host_port)?;
841
842        let parsed = if let Some(bucket_name) = bucket_name {
843            let config = mgmtx::mgmt::Management {
844                http_client,
845                user_agent,
846                endpoint,
847                username,
848                password,
849            }
850            .get_terse_bucket_config(&GetTerseBucketConfigOptions {
851                bucket_name: &bucket_name,
852                on_behalf_of_info: None,
853            })
854            .await
855            .map_err(Error::from)?;
856
857            ConfigParser::parse_terse_config(config, &hostname)?
858        } else {
859            let config = mgmtx::mgmt::Management {
860                http_client,
861                user_agent,
862                endpoint,
863                username,
864                password,
865            }
866            .get_terse_cluster_config(&GetTerseClusterConfigOptions {
867                on_behalf_of_info: None,
868            })
869            .await
870            .map_err(Error::from)?;
871
872            ConfigParser::parse_terse_config(config, &hostname)?
873        };
874
875        Ok(parsed)
876    }
877
878    fn gen_first_kv_client_configs(
879        memd_addrs: &Vec<Address>,
880        state: &AgentState,
881    ) -> HashMap<String, KvClientConfig> {
882        let mut clients = HashMap::new();
883        for addr in memd_addrs {
884            let node_id = format!("kv{addr}");
885            let config = KvClientConfig {
886                address: addr.clone(),
887                tls: state.tls_config.clone(),
888                client_name: state.client_name.clone(),
889                authenticator: state.authenticator.clone(),
890                selected_bucket: state.bucket.clone(),
891                disable_error_map: false,
892                auth_mechanisms: state.auth_mechanisms.clone(),
893                disable_mutation_tokens: state.disable_mutation_tokens,
894                disable_server_durations: state.disable_server_durations,
895                connect_timeout: state.kv_connect_timeout,
896                tcp_keep_alive_time: state.tcp_keep_alive_time,
897            };
898            clients.insert(node_id, config);
899        }
900
901        clients
902    }
903
904    fn gen_first_http_endpoints(
905        mgmt_addrs: &Vec<Address>,
906        state: &AgentState,
907    ) -> HashMap<String, FirstHttpConfig> {
908        let mut clients = HashMap::new();
909        for addr in mgmt_addrs {
910            let node_id = format!("mgmt{addr}");
911            let base = if state.tls_config.is_some() {
912                "https"
913            } else {
914                "http"
915            };
916            let config = FirstHttpConfig {
917                endpoint: format!("{base}://{addr}"),
918                tls: state.tls_config.clone(),
919                user_agent: state.client_name.clone(),
920                authenticator: state.authenticator.clone(),
921                bucket_name: state.bucket.clone(),
922            };
923            clients.insert(node_id, config);
924        }
925
926        clients
927    }
928
929    pub(crate) async fn run_with_bucket_feature_check<T, Fut>(
930        &self,
931        feature: BucketFeature,
932        operation: impl FnOnce() -> Fut,
933        message: impl Into<String>,
934    ) -> Result<T>
935    where
936        Fut: std::future::Future<Output = Result<T>>,
937    {
938        let features = self.bucket_features().await?;
939
940        if !features.contains(&feature) {
941            return Err(Error::new_feature_not_available_error(
942                format!("{feature:?}"),
943                message.into(),
944            ));
945        }
946
947        operation().await
948    }
949}
950
951struct FirstHttpConfig {
952    pub endpoint: String,
953    pub tls: Option<TlsConfig>,
954    pub user_agent: String,
955    pub authenticator: Arc<Authenticator>,
956    pub bucket_name: Option<String>,
957}