1use crate::address::Address;
20use crate::auth_mechanism::AuthMechanism;
21use crate::authenticator::Authenticator;
22use crate::cbconfig::TerseConfig;
23use crate::collection_resolver_cached::{
24 CollectionResolverCached, CollectionResolverCachedOptions,
25};
26use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
27use crate::compressionmanager::{CompressionManager, StdCompressor};
28use crate::configmanager::{
29 ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
30};
31use crate::configparser::ConfigParser;
32use crate::crudcomponent::CrudComponent;
33use crate::diagnosticscomponent::{DiagnosticsComponent, DiagnosticsComponentConfig};
34use crate::errmapcomponent::ErrMapComponent;
35use crate::error::{Error, ErrorKind, Result};
36use crate::features::BucketFeature;
37use crate::httpcomponent::HttpComponent;
38use crate::httpx::client::{ClientConfig, ReqwestClient};
39use crate::kvclient::{KvClient, KvClientConfig, KvClientOptions, StdKvClient};
40use crate::kvclient_ops::KvClientOps;
41use crate::kvclientmanager::{
42 KvClientManager, KvClientManagerConfig, KvClientManagerOptions, StdKvClientManager,
43};
44use crate::kvclientpool::{
45 KvClientPool, KvClientPoolConfig, KvClientPoolOptions, NaiveKvClientPool,
46};
47use crate::memdx::client::Client;
48use crate::memdx::opcode::OpCode;
49use crate::memdx::packet::ResponsePacket;
50use crate::memdx::request::GetClusterConfigRequest;
51use crate::mgmtcomponent::{MgmtComponent, MgmtComponentConfig, MgmtComponentOptions};
52use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions};
53use crate::networktypeheuristic::NetworkTypeHeuristic;
54use crate::nmvbhandler::{ConfigUpdater, StdNotMyVbucketConfigHandler};
55use crate::options::agent::AgentOptions;
56use crate::parsedconfig::{ParsedConfig, ParsedConfigBucketFeature, ParsedConfigFeature};
57use crate::querycomponent::{QueryComponent, QueryComponentConfig, QueryComponentOptions};
58use crate::retry::RetryManager;
59use crate::searchcomponent::{SearchComponent, SearchComponentConfig, SearchComponentOptions};
60use crate::service_type::ServiceType;
61use crate::tls_config::TlsConfig;
62use crate::util::{get_host_port_from_uri, get_hostname_from_host_port};
63use crate::vbucketrouter::{
64 StdVbucketRouter, VbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
65};
66use crate::{httpx, mgmtx};
67
68use byteorder::BigEndian;
69use futures::executor::block_on;
70use log::{debug, error, info, warn};
71use uuid::Uuid;
72
73use std::cmp::Ordering;
74use std::collections::HashMap;
75use std::error::Error as StdError;
76use std::fmt::format;
77use std::io::Cursor;
78use std::net::ToSocketAddrs;
79use std::ops::Add;
80use std::sync::{Arc, Weak};
81use std::time::Duration;
82
83use crate::orphan_reporter::OrphanReporter;
84use tokio::io::AsyncReadExt;
85use tokio::net;
86use tokio::runtime::{Handle, Runtime};
87use tokio::sync::broadcast::{Receiver, Sender};
88use tokio::sync::{broadcast, mpsc, Mutex};
89use tokio::task::JoinHandle;
90use tokio::time::{sleep, timeout, timeout_at, Instant};
91
92#[derive(Clone)]
93struct AgentState {
94 bucket: Option<String>,
95 tls_config: Option<TlsConfig>,
96 authenticator: Arc<Authenticator>,
97 auth_mechanisms: Vec<AuthMechanism>,
98 num_pool_connections: usize,
99 last_clients: HashMap<String, KvClientConfig>,
101 latest_config: ParsedConfig,
102 network_type: String,
103
104 disable_mutation_tokens: bool,
105 disable_server_durations: bool,
106 kv_connect_timeout: Duration,
107 kv_connect_throttle_timeout: Duration,
108 http_idle_connection_timeout: Duration,
109 http_max_idle_connections_per_host: Option<usize>,
110 tcp_keep_alive_time: Duration,
111
112 client_name: String,
113}
114
115type AgentClientManager = StdKvClientManager<NaiveKvClientPool<StdKvClient<Client>>>;
116type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<AgentClientManager>>;
117
118pub(crate) struct AgentInner {
119 state: Arc<Mutex<AgentState>>,
120
121 cfg_manager: Arc<ConfigManagerMemd<AgentClientManager>>,
122 conn_mgr: Arc<AgentClientManager>,
123 vb_router: Arc<StdVbucketRouter>,
124 collections: Arc<AgentCollectionResolver>,
125 retry_manager: Arc<RetryManager>,
126 http_client: Arc<ReqwestClient>,
127 err_map_component: Arc<ErrMapComponent>,
128
129 pub(crate) crud: CrudComponent<
130 AgentClientManager,
131 StdVbucketRouter,
132 StdNotMyVbucketConfigHandler<AgentInner>,
133 AgentCollectionResolver,
134 StdCompressor,
135 >,
136
137 pub(crate) query: Arc<QueryComponent<ReqwestClient>>,
138 pub(crate) search: Arc<SearchComponent<ReqwestClient>>,
139 pub(crate) mgmt: MgmtComponent<ReqwestClient>,
140 pub(crate) diagnostics: DiagnosticsComponent<ReqwestClient, AgentClientManager>,
141}
142
143pub struct Agent {
144 pub(crate) inner: Arc<AgentInner>,
145}
146
147struct AgentComponentConfigs {
148 pub config_manager_memd_config: ConfigManagerMemdConfig,
149 pub kv_client_manager_client_configs: HashMap<String, KvClientConfig>,
150 pub vbucket_routing_info: VbucketRoutingInfo,
151 pub query_config: QueryComponentConfig,
152 pub search_config: SearchComponentConfig,
153 pub mgmt_config: MgmtComponentConfig,
154 pub diagnostics_config: DiagnosticsComponentConfig,
155 pub http_client_config: ClientConfig,
156}
157
158impl AgentInner {
159 pub async fn unsolicited_packet_handler(&self, packet: ResponsePacket) {
160 if packet.op_code == OpCode::Set {
161 if let Some(ref extras) = packet.extras {
162 if extras.len() < 16 {
163 warn!("Received Set packet with too short extras: {packet:?}");
164 return;
165 }
166
167 let mut cursor = Cursor::new(extras);
168 let server_rev_epoch = cursor.read_i64().await.unwrap();
169 let server_rev_id = cursor.read_i64().await.unwrap();
170
171 if let Some(config) = self
172 .cfg_manager
173 .out_of_band_version(server_rev_id, server_rev_epoch)
174 .await
175 {
176 self.apply_config(config).await;
177 }
178 } else {
179 warn!("Received Set packet with no extras: {packet:?}");
180 }
181 }
182 }
183
184 pub async fn apply_config(&self, config: ParsedConfig) {
185 let mut state = self.state.lock().await;
186
187 info!(
188 "Agent applying updated config: rev_id={rev_id}, rev_epoch={rev_epoch}",
189 rev_id = config.rev_id,
190 rev_epoch = config.rev_epoch
191 );
192 state.latest_config = config;
193
194 self.update_state(&mut state).await;
195 }
196
197 async fn update_state(&self, state: &mut AgentState) {
198 debug!("Agent updating state");
199
200 let agent_component_configs = Self::gen_agent_component_configs(state);
201
202 let mut old_clients = HashMap::new();
210 for (client_name, client) in &state.last_clients {
211 old_clients.insert(client_name.clone(), client.clone());
212 }
213
214 for (client_name, client) in &agent_component_configs.kv_client_manager_client_configs {
215 old_clients
216 .entry(client_name.clone())
217 .or_insert(client.clone());
218 }
219
220 if let Err(e) = self
221 .conn_mgr
222 .reconfigure(KvClientManagerConfig {
223 num_pool_connections: state.num_pool_connections,
224 clients: old_clients,
225 })
226 .await
227 {
228 error!("Failed to reconfigure connection manager (old clients); {e}");
229 };
230
231 self.vb_router
232 .update_vbucket_info(agent_component_configs.vbucket_routing_info);
233
234 if let Err(e) = self
235 .cfg_manager
236 .reconfigure(agent_component_configs.config_manager_memd_config)
237 {
238 error!("Failed to reconfigure memd config watcher component; {e}");
239 }
240
241 if let Err(e) = self
242 .conn_mgr
243 .reconfigure(KvClientManagerConfig {
244 num_pool_connections: state.num_pool_connections,
245 clients: agent_component_configs.kv_client_manager_client_configs,
246 })
247 .await
248 {
249 error!("Failed to reconfigure connection manager (updated clients); {e}");
250 }
251
252 if let Err(e) = self
253 .http_client
254 .reconfigure(agent_component_configs.http_client_config)
255 {
256 error!("Failed to reconfigure http client: {e}");
257 }
258
259 self.query.reconfigure(agent_component_configs.query_config);
260 self.search
261 .reconfigure(agent_component_configs.search_config);
262 self.mgmt.reconfigure(agent_component_configs.mgmt_config);
263 self.diagnostics
264 .reconfigure(agent_component_configs.diagnostics_config);
265 }
266
267 fn gen_agent_component_configs(state: &mut AgentState) -> AgentComponentConfigs {
268 let config = &state.latest_config;
269 let rev_id = config.rev_id;
270 let network_info = config.addresses_group_for_network_type(&state.network_type);
271
272 let mut gcccp_node_ids = Vec::new();
273 let mut kv_data_node_ids = Vec::new();
274 let mut kv_data_hosts: HashMap<String, Address> = HashMap::new();
275 let mut mgmt_endpoints: HashMap<String, String> = HashMap::new();
276 let mut query_endpoints: HashMap<String, String> = HashMap::new();
277 let mut search_endpoints: HashMap<String, String> = HashMap::new();
278
279 for node in network_info.nodes {
280 let kv_ep_id = format!("kv{}", node.node_id);
281 let mgmt_ep_id = format!("mgmt{}", node.node_id);
282 let query_ep_id = format!("query{}", node.node_id);
283 let search_ep_id = format!("search{}", node.node_id);
284
285 gcccp_node_ids.push(kv_ep_id.clone());
286
287 if node.has_data {
288 kv_data_node_ids.push(kv_ep_id.clone());
289 }
290
291 if state.tls_config.is_some() {
292 if let Some(p) = node.ssl_ports.kv {
293 kv_data_hosts.insert(
294 kv_ep_id,
295 Address {
296 host: node.hostname.clone(),
297 port: p,
298 },
299 );
300 }
301 if let Some(p) = node.ssl_ports.mgmt {
302 mgmt_endpoints.insert(mgmt_ep_id, format!("https://{}:{}", node.hostname, p));
303 }
304 if let Some(p) = node.ssl_ports.query {
305 query_endpoints.insert(query_ep_id, format!("https://{}:{}", node.hostname, p));
306 }
307 if let Some(p) = node.ssl_ports.search {
308 search_endpoints
309 .insert(search_ep_id, format!("https://{}:{}", node.hostname, p));
310 }
311 } else {
312 if let Some(p) = node.non_ssl_ports.kv {
313 kv_data_hosts.insert(
314 kv_ep_id,
315 Address {
316 host: node.hostname.clone(),
317 port: p,
318 },
319 );
320 }
321 if let Some(p) = node.non_ssl_ports.mgmt {
322 mgmt_endpoints.insert(mgmt_ep_id, format!("http://{}:{}", node.hostname, p));
323 }
324 if let Some(p) = node.non_ssl_ports.query {
325 query_endpoints.insert(query_ep_id, format!("http://{}:{}", node.hostname, p));
326 }
327 if let Some(p) = node.non_ssl_ports.search {
328 search_endpoints
329 .insert(search_ep_id, format!("http://{}:{}", node.hostname, p));
330 }
331 }
332 }
333
334 let mut clients = HashMap::new();
335 for (node_id, address) in kv_data_hosts {
336 let config = KvClientConfig {
337 address,
338 tls: state.tls_config.clone(),
339 client_name: state.client_name.clone(),
340 authenticator: state.authenticator.clone(),
341 selected_bucket: state.bucket.clone(),
342 disable_error_map: false,
343 auth_mechanisms: state.auth_mechanisms.clone(),
344 disable_mutation_tokens: state.disable_mutation_tokens,
345 disable_server_durations: state.disable_server_durations,
346 connect_timeout: state.kv_connect_timeout,
347 tcp_keep_alive_time: state.tcp_keep_alive_time,
348 };
349 clients.insert(node_id, config);
350 }
351
352 let vbucket_routing_info = if let Some(info) = &state.latest_config.bucket {
353 VbucketRoutingInfo {
354 vbucket_info: info.vbucket_map.clone(),
355 server_list: kv_data_node_ids,
356 bucket_selected: state.bucket.is_some(),
357 }
358 } else {
359 VbucketRoutingInfo {
360 vbucket_info: None,
361 server_list: kv_data_node_ids,
362 bucket_selected: state.bucket.is_some(),
363 }
364 };
365
366 let mut available_services = vec![ServiceType::MEMD];
367 if !query_endpoints.is_empty() {
368 available_services.push(ServiceType::QUERY)
369 }
370 if !search_endpoints.is_empty() {
371 available_services.push(ServiceType::SEARCH)
372 }
373
374 AgentComponentConfigs {
375 config_manager_memd_config: ConfigManagerMemdConfig {
376 endpoints: gcccp_node_ids,
377 },
378 kv_client_manager_client_configs: clients,
379 vbucket_routing_info,
380 query_config: QueryComponentConfig {
381 endpoints: query_endpoints,
382 authenticator: state.authenticator.clone(),
383 },
384 search_config: SearchComponentConfig {
385 endpoints: search_endpoints,
386 authenticator: state.authenticator.clone(),
387 vector_search_enabled: state
388 .latest_config
389 .features
390 .contains(&ParsedConfigFeature::FtsVectorSearch),
391 },
392 http_client_config: ClientConfig {
393 tls_config: state.tls_config.clone(),
394 idle_connection_timeout: state.http_idle_connection_timeout,
395 max_idle_connections_per_host: state.http_max_idle_connections_per_host,
396 tcp_keep_alive_time: state.tcp_keep_alive_time,
397 },
398 mgmt_config: MgmtComponentConfig {
399 endpoints: mgmt_endpoints,
400 authenticator: state.authenticator.clone(),
401 },
402 diagnostics_config: DiagnosticsComponentConfig {
403 bucket: state.bucket.clone(),
404 services: available_services,
405 rev_id,
406 },
407 }
408 }
409
410 pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
412 let guard = self.state.lock().await;
413
414 if let Some(bucket) = &guard.latest_config.bucket {
415 let mut features = vec![];
416
417 for feature in &bucket.features {
418 match feature {
419 ParsedConfigBucketFeature::CreateAsDeleted => {
420 features.push(BucketFeature::CreateAsDeleted)
421 }
422 ParsedConfigBucketFeature::ReplaceBodyWithXattr => {
423 features.push(BucketFeature::ReplaceBodyWithXattr)
424 }
425 ParsedConfigBucketFeature::RangeScan => features.push(BucketFeature::RangeScan),
426 ParsedConfigBucketFeature::ReplicaRead => {
427 features.push(BucketFeature::ReplicaRead)
428 }
429 ParsedConfigBucketFeature::NonDedupedHistory => {
430 features.push(BucketFeature::NonDedupedHistory)
431 }
432 ParsedConfigBucketFeature::ReviveDocument => {
433 features.push(BucketFeature::ReviveDocument)
434 }
435 _ => {}
436 }
437 }
438
439 return Ok(features);
440 }
441
442 Err(ErrorKind::NoBucket.into())
443 }
444}
445
446impl ConfigUpdater for AgentInner {
447 async fn apply_terse_config(&self, config: TerseConfig, source_hostname: &str) {
448 let parsed_config = match ConfigParser::parse_terse_config(config, source_hostname) {
449 Ok(cfg) => cfg,
450 Err(_e) => {
451 return;
453 }
454 };
455
456 if let Some(config) = self.cfg_manager.out_of_band_config(parsed_config) {
457 self.apply_config(config).await;
458 };
459 }
460}
461
462impl Agent {
463 pub async fn new(opts: AgentOptions) -> Result<Self> {
464 let build_version = env!("CARGO_PKG_VERSION");
465 let client_name = format!("couchbase-rs-core {build_version}");
466 info!("Creating new agent {client_name}");
467
468 let auth_mechanisms = if opts.auth_mechanisms.is_empty() {
469 if opts.tls_config.is_some() {
470 vec![AuthMechanism::Plain]
471 } else {
472 vec![
473 AuthMechanism::ScramSha512,
474 AuthMechanism::ScramSha256,
475 AuthMechanism::ScramSha1,
476 ]
477 }
478 } else {
479 if opts.tls_config.is_none() && opts.auth_mechanisms.contains(&AuthMechanism::Plain) {
480 warn!("PLAIN sends credentials in plaintext, this will cause credential leakage on the network");
481 } else if opts.tls_config.is_some()
482 && (opts.auth_mechanisms.contains(&AuthMechanism::ScramSha512)
483 || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha256)
484 || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha1))
485 {
486 warn!("Consider using PLAIN for TLS connections, as it is more efficient");
487 }
488
489 opts.auth_mechanisms
490 };
491
492 let mut state = AgentState {
493 bucket: opts.bucket_name.clone(),
494 authenticator: Arc::new(opts.authenticator),
495 num_pool_connections: opts.kv_config.num_connections,
496 last_clients: Default::default(),
497 latest_config: ParsedConfig::default(),
498 network_type: "".to_string(),
499 client_name: client_name.clone(),
500 tls_config: opts.tls_config,
501 auth_mechanisms,
502 disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens,
503 disable_server_durations: !opts.kv_config.enable_server_durations,
504 kv_connect_timeout: opts.kv_config.connect_timeout,
505 kv_connect_throttle_timeout: opts.kv_config.connect_throttle_timeout,
506 http_idle_connection_timeout: opts.http_config.idle_connection_timeout,
507 http_max_idle_connections_per_host: opts.http_config.max_idle_connections_per_host,
508 tcp_keep_alive_time: opts
509 .tcp_keep_alive_time
510 .unwrap_or_else(|| Duration::from_secs(60)),
511 };
512
513 let http_client = Arc::new(ReqwestClient::new(ClientConfig {
514 tls_config: state.tls_config.clone(),
515 idle_connection_timeout: state.http_idle_connection_timeout,
516 max_idle_connections_per_host: state.http_max_idle_connections_per_host,
517 tcp_keep_alive_time: state.tcp_keep_alive_time,
518 })?);
519
520 let err_map_component = Arc::new(ErrMapComponent::new());
521
522 let connect_timeout = opts.kv_config.connect_timeout;
523
524 let first_kv_client_configs =
525 Self::gen_first_kv_client_configs(&opts.seed_config.memd_addrs, &state);
526 let first_http_client_configs =
527 Self::gen_first_http_endpoints(&opts.seed_config.http_addrs, &state);
528 let first_config = Self::get_first_config(
529 first_kv_client_configs,
530 first_http_client_configs,
531 http_client.clone(),
532 err_map_component.clone(),
533 connect_timeout,
534 )
535 .await?;
536
537 state.latest_config = first_config.clone();
538
539 let network_type = NetworkTypeHeuristic::identify(&state.latest_config);
540 state.network_type = network_type;
541
542 let agent_component_configs = AgentInner::gen_agent_component_configs(&mut state);
543
544 let err_map_component_conn_mgr = err_map_component.clone();
545
546 let num_pool_connections = state.num_pool_connections;
547 let state = Arc::new(Mutex::new(state));
548
549 let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel();
550
551 let conn_mgr = Arc::new(
552 StdKvClientManager::new(
553 KvClientManagerConfig {
554 num_pool_connections,
555 clients: agent_component_configs.kv_client_manager_client_configs,
556 },
557 KvClientManagerOptions {
558 connect_timeout,
559 connect_throttle_period: opts.kv_config.connect_throttle_timeout,
560 unsolicited_packet_tx: Some(unsolicited_packet_tx),
561 orphan_handler: opts.orphan_response_handler,
562 on_err_map_fetched_handler: Some(Arc::new(move |err_map| {
563 err_map_component_conn_mgr.on_err_map(err_map);
564 })),
565 disable_decompression: opts.compression_config.disable_decompression,
566 },
567 )
568 .await?,
569 );
570
571 let cfg_manager = Arc::new(ConfigManagerMemd::new(
572 agent_component_configs.config_manager_memd_config,
573 ConfigManagerMemdOptions {
574 polling_period: opts.config_poller_config.poll_interval,
575 kv_client_manager: conn_mgr.clone(),
576 first_config,
577 },
578 ));
579 let vb_router = Arc::new(StdVbucketRouter::new(
580 agent_component_configs.vbucket_routing_info,
581 VbucketRouterOptions {},
582 ));
583
584 let nmvb_handler = Arc::new(StdNotMyVbucketConfigHandler::new());
585
586 let memd_resolver = CollectionResolverMemd::new(CollectionResolverMemdOptions {
587 conn_mgr: conn_mgr.clone(),
588 });
589
590 let collections = Arc::new(CollectionResolverCached::new(
591 CollectionResolverCachedOptions {
592 resolver: memd_resolver,
593 },
594 ));
595
596 let retry_manager = Arc::new(RetryManager::new(err_map_component.clone()));
597 let compression_manager = Arc::new(CompressionManager::new(opts.compression_config));
598
599 let crud = CrudComponent::new(
600 nmvb_handler.clone(),
601 vb_router.clone(),
602 conn_mgr.clone(),
603 collections.clone(),
604 retry_manager.clone(),
605 compression_manager,
606 );
607
608 let mgmt = MgmtComponent::new(
609 retry_manager.clone(),
610 http_client.clone(),
611 agent_component_configs.mgmt_config,
612 MgmtComponentOptions {
613 user_agent: client_name.clone(),
614 },
615 );
616
617 let query = Arc::new(QueryComponent::new(
618 retry_manager.clone(),
619 http_client.clone(),
620 agent_component_configs.query_config,
621 QueryComponentOptions {
622 user_agent: client_name.clone(),
623 },
624 ));
625
626 let search = Arc::new(SearchComponent::new(
627 retry_manager.clone(),
628 http_client.clone(),
629 agent_component_configs.search_config,
630 SearchComponentOptions {
631 user_agent: client_name.clone(),
632 },
633 ));
634
635 let diagnostics = DiagnosticsComponent::new(
636 conn_mgr.clone(),
637 query.clone(),
638 search.clone(),
639 retry_manager.clone(),
640 agent_component_configs.diagnostics_config,
641 );
642
643 let inner = Arc::new(AgentInner {
644 state,
645 cfg_manager: cfg_manager.clone(),
646 conn_mgr,
647 vb_router,
648 crud,
649 collections,
650 retry_manager,
651 http_client,
652 err_map_component,
653
654 mgmt,
655 query,
656 search,
657 diagnostics,
658 });
659
660 let inner_clone = Arc::downgrade(&inner);
661 tokio::spawn(async move {
662 while let Some(packet) = unsolicited_packet_rx.recv().await {
663 if let Some(inner_clone) = inner_clone.upgrade() {
664 inner_clone.unsolicited_packet_handler(packet).await;
665 } else {
666 break;
667 }
668 }
669 debug!("Unsolicited packet handler exited");
670 });
671
672 nmvb_handler.set_watcher(Arc::downgrade(&inner)).await;
673
674 Self::start_config_watcher(Arc::downgrade(&inner), cfg_manager);
675
676 let agent = Agent { inner };
677
678 debug!(
679 "Agent created, {} strong references",
680 Arc::strong_count(&agent.inner)
681 );
682
683 Ok(agent)
684 }
685
686 fn start_config_watcher(
687 inner: Weak<AgentInner>,
688 config_watcher: Arc<impl ConfigManager>,
689 ) -> JoinHandle<()> {
690 let mut watch_rx = config_watcher.watch();
691
692 let inner = inner.clone();
693 tokio::spawn(async move {
694 loop {
695 match watch_rx.changed().await {
696 Ok(_) => {
697 let pc = {
698 watch_rx.borrow_and_update().clone()
702 };
703 if let Some(i) = inner.upgrade() {
704 i.apply_config(pc).await;
705 } else {
706 debug!("Config watcher inner dropped, exiting");
707 return;
708 }
709 }
710 Err(_e) => {
711 debug!("Config watcher channel closed");
712 return;
713 }
714 }
715 }
716 })
717 }
718
719 async fn get_first_config<C: httpx::client::Client>(
720 kv_client_manager_client_configs: HashMap<String, KvClientConfig>,
721 http_configs: HashMap<String, FirstHttpConfig>,
722 http_client: Arc<C>,
723 err_map_component: Arc<ErrMapComponent>,
724 connect_timeout: Duration,
725 ) -> Result<ParsedConfig> {
726 loop {
727 for endpoint_config in kv_client_manager_client_configs.values() {
728 let host = &endpoint_config.address;
729 let err_map_component_clone = err_map_component.clone();
730 let timeout_result = timeout(
731 connect_timeout,
732 StdKvClient::new(
733 endpoint_config.clone(),
734 KvClientOptions {
735 unsolicited_packet_tx: None,
736 orphan_handler: None,
737 on_close: Arc::new(|id| {
738 Box::pin(async move {
739 debug!("Bootstrap client {id} closed");
740 })
741 }),
742 on_err_map_fetched: Some(Arc::new(move |err_map| {
743 err_map_component_clone.on_err_map(err_map);
744 })),
745 disable_decompression: false,
746 id: Uuid::new_v4().to_string(),
747 },
748 ),
749 )
750 .await;
751
752 let client: StdKvClient<Client> = match timeout_result {
753 Ok(client_result) => match client_result {
754 Ok(client) => client,
755 Err(e) => {
756 let mut msg = format!("Failed to connect to endpoint: {e}");
757 if let Some(source) = e.source() {
758 msg = format!("{msg} - {source}");
759 }
760 warn!("{msg}");
761 continue;
762 }
763 },
764 Err(_e) => continue,
765 };
766
767 let raw_config = match client
768 .get_cluster_config(GetClusterConfigRequest {
769 known_version: None,
770 })
771 .await
772 {
773 Ok(resp) => resp.config,
774 Err(_e) => continue,
775 };
776
777 client.close().await?;
778
779 let config: TerseConfig =
780 serde_json::from_slice(raw_config.as_slice()).map_err(|e| {
781 Error::new_message_error(format!("failed to deserialize config: {e}"))
782 })?;
783
784 match ConfigParser::parse_terse_config(config, host.host.as_str()) {
785 Ok(c) => {
786 return Ok(c);
787 }
788 Err(_e) => continue,
789 };
790 }
791
792 info!("Failed to fetch config over kv, attempting http");
793 for endpoint_config in http_configs.values() {
794 let endpoint = endpoint_config.endpoint.clone();
795 let host = get_host_port_from_uri(&endpoint)?;
796 let user_pass = match endpoint_config.authenticator.as_ref() {
797 Authenticator::PasswordAuthenticator(authenticator) => {
798 authenticator.get_credentials(&ServiceType::MGMT, host)?
799 }
800 Authenticator::CertificateAuthenticator(authenticator) => {
801 authenticator.get_credentials(&ServiceType::MGMT, host)?
802 }
803 };
804
805 match Self::fetch_http_config(
806 http_client.clone(),
807 endpoint,
808 endpoint_config.user_agent.clone(),
809 user_pass.username,
810 user_pass.password,
811 endpoint_config.bucket_name.clone(),
812 )
813 .await
814 {
815 Ok(c) => {
816 return Ok(c);
817 }
818 Err(_e) => {}
819 };
820 }
821
822 info!("Failed to fetch config from any source");
823
824 sleep(Duration::from_secs(1)).await;
826 }
827 }
828
829 pub(crate) async fn fetch_http_config<C: httpx::client::Client>(
830 http_client: Arc<C>,
831 endpoint: String,
832 user_agent: String,
833 username: String,
834 password: String,
835 bucket_name: Option<String>,
836 ) -> Result<ParsedConfig> {
837 debug!("Polling config from {}", &endpoint);
838
839 let host_port = get_host_port_from_uri(&endpoint)?;
840 let hostname = get_hostname_from_host_port(&host_port)?;
841
842 let parsed = if let Some(bucket_name) = bucket_name {
843 let config = mgmtx::mgmt::Management {
844 http_client,
845 user_agent,
846 endpoint,
847 username,
848 password,
849 }
850 .get_terse_bucket_config(&GetTerseBucketConfigOptions {
851 bucket_name: &bucket_name,
852 on_behalf_of_info: None,
853 })
854 .await
855 .map_err(Error::from)?;
856
857 ConfigParser::parse_terse_config(config, &hostname)?
858 } else {
859 let config = mgmtx::mgmt::Management {
860 http_client,
861 user_agent,
862 endpoint,
863 username,
864 password,
865 }
866 .get_terse_cluster_config(&GetTerseClusterConfigOptions {
867 on_behalf_of_info: None,
868 })
869 .await
870 .map_err(Error::from)?;
871
872 ConfigParser::parse_terse_config(config, &hostname)?
873 };
874
875 Ok(parsed)
876 }
877
878 fn gen_first_kv_client_configs(
879 memd_addrs: &Vec<Address>,
880 state: &AgentState,
881 ) -> HashMap<String, KvClientConfig> {
882 let mut clients = HashMap::new();
883 for addr in memd_addrs {
884 let node_id = format!("kv{addr}");
885 let config = KvClientConfig {
886 address: addr.clone(),
887 tls: state.tls_config.clone(),
888 client_name: state.client_name.clone(),
889 authenticator: state.authenticator.clone(),
890 selected_bucket: state.bucket.clone(),
891 disable_error_map: false,
892 auth_mechanisms: state.auth_mechanisms.clone(),
893 disable_mutation_tokens: state.disable_mutation_tokens,
894 disable_server_durations: state.disable_server_durations,
895 connect_timeout: state.kv_connect_timeout,
896 tcp_keep_alive_time: state.tcp_keep_alive_time,
897 };
898 clients.insert(node_id, config);
899 }
900
901 clients
902 }
903
904 fn gen_first_http_endpoints(
905 mgmt_addrs: &Vec<Address>,
906 state: &AgentState,
907 ) -> HashMap<String, FirstHttpConfig> {
908 let mut clients = HashMap::new();
909 for addr in mgmt_addrs {
910 let node_id = format!("mgmt{addr}");
911 let base = if state.tls_config.is_some() {
912 "https"
913 } else {
914 "http"
915 };
916 let config = FirstHttpConfig {
917 endpoint: format!("{base}://{addr}"),
918 tls: state.tls_config.clone(),
919 user_agent: state.client_name.clone(),
920 authenticator: state.authenticator.clone(),
921 bucket_name: state.bucket.clone(),
922 };
923 clients.insert(node_id, config);
924 }
925
926 clients
927 }
928
929 pub(crate) async fn run_with_bucket_feature_check<T, Fut>(
930 &self,
931 feature: BucketFeature,
932 operation: impl FnOnce() -> Fut,
933 message: impl Into<String>,
934 ) -> Result<T>
935 where
936 Fut: std::future::Future<Output = Result<T>>,
937 {
938 let features = self.bucket_features().await?;
939
940 if !features.contains(&feature) {
941 return Err(Error::new_feature_not_available_error(
942 format!("{feature:?}"),
943 message.into(),
944 ));
945 }
946
947 operation().await
948 }
949}
950
951struct FirstHttpConfig {
952 pub endpoint: String,
953 pub tls: Option<TlsConfig>,
954 pub user_agent: String,
955 pub authenticator: Arc<Authenticator>,
956 pub bucket_name: Option<String>,
957}