1use crate::address::Address;
20use crate::auth_mechanism::AuthMechanism;
21use crate::authenticator::Authenticator;
22use crate::cbconfig::TerseConfig;
23use crate::collection_resolver_cached::{
24 CollectionResolverCached, CollectionResolverCachedOptions,
25};
26use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
27use crate::compressionmanager::{CompressionManager, StdCompressor};
28use crate::configmanager::{
29 ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
30};
31use crate::configparser::ConfigParser;
32use crate::crudcomponent::CrudComponent;
33use crate::diagnosticscomponent::{DiagnosticsComponent, DiagnosticsComponentConfig};
34use crate::errmapcomponent::ErrMapComponent;
35use crate::error::{Error, ErrorKind, Result};
36use crate::features::BucketFeature;
37use crate::httpcomponent::HttpComponent;
38use crate::httpx::client::{ClientConfig, ReqwestClient};
39use crate::kvclient::{
40 KvClient, KvClientBootstrapOptions, KvClientOptions, StdKvClient, UnsolicitedPacket,
41};
42use crate::kvclient_ops::KvClientOps;
43use crate::kvclientpool::{KvClientPool, KvClientPoolOptions, StdKvClientPool};
44use crate::memdx::client::Client;
45use crate::memdx::opcode::OpCode;
46use crate::memdx::packet::ResponsePacket;
47use crate::memdx::request::GetClusterConfigRequest;
48use crate::mgmtcomponent::{MgmtComponent, MgmtComponentConfig, MgmtComponentOptions};
49use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions};
50use crate::networktypeheuristic::NetworkTypeHeuristic;
51use crate::nmvbhandler::{ConfigUpdater, StdNotMyVbucketConfigHandler};
52use crate::options::agent::{AgentOptions, ReconfigureAgentOptions};
53use crate::parsedconfig::{ParsedConfig, ParsedConfigBucketFeature, ParsedConfigFeature};
54use crate::querycomponent::{QueryComponent, QueryComponentConfig, QueryComponentOptions};
55use crate::retry::RetryManager;
56use crate::searchcomponent::{SearchComponent, SearchComponentConfig, SearchComponentOptions};
57use crate::service_type::ServiceType;
58use crate::tls_config::TlsConfig;
59use crate::util::{get_host_port_from_uri, get_hostname_from_host_port};
60use crate::vbucketrouter::{
61 StdVbucketRouter, VbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
62};
63use crate::{httpx, mgmtx};
64
65use byteorder::BigEndian;
66use futures::executor::block_on;
67use tracing::{debug, error, info, warn};
68use uuid::Uuid;
69
70use crate::analyticscomponent::{AnalyticsComponent, AnalyticsComponentOptions};
71use crate::componentconfigs::{AgentComponentConfigs, HttpClientConfig};
72use crate::httpx::request::{Auth, BasicAuth, BearerAuth};
73use crate::kvclient_babysitter::{KvTarget, StdKvClientBabysitter};
74use crate::kvendpointclientmanager::{
75 KvEndpointClientManager, KvEndpointClientManagerOptions, StdKvEndpointClientManager,
76};
77use crate::orphan_reporter::OrphanReporter;
78use crate::tracingcomponent::{TracingComponent, TracingComponentConfig};
79use arc_swap::ArcSwap;
80use std::cmp::Ordering;
81use std::collections::HashMap;
82use std::error::Error as StdError;
83use std::fmt::{format, Display};
84use std::io::Cursor;
85use std::net::ToSocketAddrs;
86use std::ops::{Add, Deref};
87use std::sync::{Arc, Weak};
88use std::time::Duration;
89use tokio::io::AsyncReadExt;
90use tokio::net;
91use tokio::runtime::{Handle, Runtime};
92use tokio::sync::broadcast::{Receiver, Sender};
93use tokio::sync::{broadcast, mpsc, Mutex};
94use tokio::task::JoinHandle;
95use tokio::time::{sleep, timeout, timeout_at, Instant};
96
97#[derive(Clone)]
98struct AgentState {
99 bucket: Option<String>,
100 tls_config: Option<TlsConfig>,
101 authenticator: Authenticator,
102 auth_mechanisms: Vec<AuthMechanism>,
103 num_pool_connections: usize,
104 latest_config: ParsedConfig,
106 network_type: String,
107
108 disable_error_map: bool,
109 disable_mutation_tokens: bool,
110 disable_server_durations: bool,
111 kv_connect_timeout: Duration,
112 kv_connect_throttle_timeout: Duration,
113 http_idle_connection_timeout: Duration,
114 http_max_idle_connections_per_host: Option<usize>,
115 tcp_keep_alive_time: Duration,
116}
117
118impl Display for AgentState {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 write!(
121 f,
122 "{{ bucket: {:?}, network_type: {}, num_pool_connections: {}, latest_config_rev_id: {}, latest_config_rev_epoch: {}, authenticator: {} }}",
123 self.bucket,
124 self.network_type,
125 self.num_pool_connections,
126 self.latest_config.rev_id,
127 self.latest_config.rev_epoch,
128 self.authenticator,
129
130 )
131 }
132}
133
134type AgentClientManager = StdKvEndpointClientManager<
135 StdKvClientPool<StdKvClientBabysitter<StdKvClient<Client>>, StdKvClient<Client>>,
136 StdKvClient<Client>,
137>;
138type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<AgentClientManager>>;
139
140pub(crate) struct AgentInner {
141 state: Arc<Mutex<AgentState>>,
142 bucket: Option<String>,
143
144 cfg_manager: Arc<ConfigManagerMemd<AgentClientManager>>,
145 conn_mgr: Arc<AgentClientManager>,
146 vb_router: Arc<StdVbucketRouter>,
147 collections: Arc<AgentCollectionResolver>,
148 retry_manager: Arc<RetryManager>,
149 http_client: Arc<ReqwestClient>,
150 err_map_component: Arc<ErrMapComponent>,
151
152 pub(crate) crud: CrudComponent<
153 AgentClientManager,
154 StdVbucketRouter,
155 StdNotMyVbucketConfigHandler<AgentInner>,
156 AgentCollectionResolver,
157 StdCompressor,
158 >,
159
160 pub(crate) analytics: Arc<AnalyticsComponent<ReqwestClient>>,
161 pub(crate) query: Arc<QueryComponent<ReqwestClient>>,
162 pub(crate) search: Arc<SearchComponent<ReqwestClient>>,
163 pub(crate) mgmt: MgmtComponent<ReqwestClient>,
164 pub(crate) diagnostics: DiagnosticsComponent<ReqwestClient, AgentClientManager>,
165 pub(crate) tracing: Arc<TracingComponent>,
166}
167
168pub struct Agent {
169 pub(crate) inner: Arc<AgentInner>,
170 user_agent: String,
171 id: String,
172}
173
174impl AgentInner {
175 fn gen_agent_component_configs_locked(state: &AgentState) -> AgentComponentConfigs {
176 AgentComponentConfigs::gen_from_config(
177 &state.latest_config,
178 &state.network_type,
179 state.tls_config.clone(),
180 state.bucket.clone(),
181 state.authenticator.clone(),
182 )
183 }
184
185 pub async fn unsolicited_packet_handler(&self, up: UnsolicitedPacket) {
186 let packet = up.packet;
187 if packet.op_code == OpCode::Set {
188 if let Some(ref extras) = packet.extras {
189 if extras.len() < 16 {
190 warn!("Received Set packet with too short extras: {packet:?}");
191 return;
192 }
193
194 let mut cursor = Cursor::new(extras);
195 let server_rev_epoch = cursor.read_i64().await.unwrap();
196 let server_rev_id = cursor.read_i64().await.unwrap();
197
198 if let Some(config) = self
199 .cfg_manager
200 .out_of_band_version(server_rev_id, server_rev_epoch, up.endpoint_id)
201 .await
202 {
203 self.apply_config(config).await;
204 }
205 } else {
206 warn!("Received Set packet with no extras: {packet:?}");
207 }
208 }
209 }
210
211 pub async fn apply_config(&self, config: ParsedConfig) {
212 let mut state = self.state.lock().await;
213
214 info!(
215 "Agent applying updated config: rev_id={rev_id}, rev_epoch={rev_epoch}",
216 rev_id = config.rev_id,
217 rev_epoch = config.rev_epoch
218 );
219 state.latest_config = config;
220
221 self.update_state_locked(&mut state).await;
222 }
223
224 async fn update_state_locked(&self, state: &mut AgentState) {
225 debug!("Agent updating state {}", state);
226
227 let agent_component_configs = Self::gen_agent_component_configs_locked(state);
228
229 if let Err(e) = self
237 .conn_mgr
238 .update_endpoints(agent_component_configs.kv_targets.clone(), true)
239 .await
240 {
241 error!("Failed to reconfigure connection manager (add-only); {e}");
242 };
243
244 self.vb_router
245 .update_vbucket_info(agent_component_configs.vbucket_routing_info);
246
247 if let Err(e) = self
248 .cfg_manager
249 .reconfigure(agent_component_configs.config_manager_memd_config)
250 {
251 error!("Failed to reconfigure memd config watcher component; {e}");
252 }
253
254 if let Err(e) = self
255 .conn_mgr
256 .update_endpoints(agent_component_configs.kv_targets, false)
257 .await
258 {
259 error!("Failed to reconfigure connection manager; {e}");
260 }
261
262 self.analytics
263 .reconfigure(agent_component_configs.analytics_config);
264 self.query.reconfigure(agent_component_configs.query_config);
265 self.search
266 .reconfigure(agent_component_configs.search_config);
267 self.mgmt.reconfigure(agent_component_configs.mgmt_config);
268 self.diagnostics
269 .reconfigure(agent_component_configs.diagnostics_config);
270 self.tracing
271 .reconfigure(agent_component_configs.tracing_config);
272 }
273
274 pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
275 let guard = self.state.lock().await;
276
277 if let Some(bucket) = &guard.latest_config.bucket {
278 let mut features = vec![];
279
280 for feature in &bucket.features {
281 match feature {
282 ParsedConfigBucketFeature::CreateAsDeleted => {
283 features.push(BucketFeature::CreateAsDeleted)
284 }
285 ParsedConfigBucketFeature::ReplaceBodyWithXattr => {
286 features.push(BucketFeature::ReplaceBodyWithXattr)
287 }
288 ParsedConfigBucketFeature::RangeScan => features.push(BucketFeature::RangeScan),
289 ParsedConfigBucketFeature::ReplicaRead => {
290 features.push(BucketFeature::ReplicaRead)
291 }
292 ParsedConfigBucketFeature::NonDedupedHistory => {
293 features.push(BucketFeature::NonDedupedHistory)
294 }
295 ParsedConfigBucketFeature::ReviveDocument => {
296 features.push(BucketFeature::ReviveDocument)
297 }
298 _ => {}
299 }
300 }
301
302 return Ok(features);
303 }
304
305 Err(ErrorKind::NoBucket.into())
306 }
307
308 pub(crate) fn get_bucket_name(&self) -> Option<String> {
309 self.bucket.clone()
310 }
311
312 pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
313 let mut state = self.state.lock().await;
314 state.tls_config = opts.tls_config.clone();
315 state.authenticator = opts.authenticator.clone();
316
317 match self
319 .http_client
320 .update_tls(httpx::client::UpdateTlsOptions {
321 tls_config: opts.tls_config,
322 }) {
323 Ok(_) => {}
324 Err(e) => {
325 warn!("Failed to update TLS for HTTP client: {}", e);
326 }
327 };
328
329 self.conn_mgr.update_auth(opts.authenticator).await;
330
331 self.update_state_locked(&mut state).await;
332 }
333}
334
335impl ConfigUpdater for AgentInner {
336 async fn apply_terse_config(&self, config: TerseConfig, source_hostname: &str) {
337 let parsed_config = match ConfigParser::parse_terse_config(config, source_hostname) {
338 Ok(cfg) => cfg,
339 Err(_e) => {
340 return;
342 }
343 };
344
345 if let Some(config) = self.cfg_manager.out_of_band_config(parsed_config) {
346 self.apply_config(config).await;
347 };
348 }
349}
350
351impl Agent {
352 pub async fn new(opts: AgentOptions) -> Result<Self> {
353 let build_version = env!("CARGO_PKG_VERSION");
354 let user_agent = format!("cb-rust/{build_version}");
355 let agent_id = Uuid::new_v4().to_string();
356 info!(
357 "Core SDK Version: {} - Agent ID: {}",
358 &user_agent, &agent_id
359 );
360 info!("Agent Options {opts}");
361
362 let auth_mechanisms = if !opts.auth_mechanisms.is_empty() {
363 if opts.tls_config.is_none() && opts.auth_mechanisms.contains(&AuthMechanism::Plain) {
364 warn!("PLAIN sends credentials in plaintext, this will cause credential leakage on the network");
365 } else if opts.tls_config.is_some()
366 && (opts.auth_mechanisms.contains(&AuthMechanism::ScramSha512)
367 || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha256)
368 || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha1))
369 {
370 warn!("Consider using PLAIN for TLS connections, as it is more efficient");
371 }
372
373 opts.auth_mechanisms
374 } else {
375 vec![]
376 };
377
378 let mut state = AgentState {
379 bucket: opts.bucket_name.clone(),
380 authenticator: opts.authenticator.clone(),
381 num_pool_connections: opts.kv_config.num_connections,
382 latest_config: ParsedConfig::default(),
383 network_type: "".to_string(),
384 tls_config: opts.tls_config,
385 auth_mechanisms: auth_mechanisms.clone(),
386 disable_error_map: !opts.kv_config.enable_error_map,
387 disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens,
388 disable_server_durations: !opts.kv_config.enable_server_durations,
389 kv_connect_timeout: opts.kv_config.connect_timeout,
390 kv_connect_throttle_timeout: opts.kv_config.connect_throttle_timeout,
391 http_idle_connection_timeout: opts.http_config.idle_connection_timeout,
392 http_max_idle_connections_per_host: opts.http_config.max_idle_connections_per_host,
393 tcp_keep_alive_time: opts
394 .tcp_keep_alive_time
395 .unwrap_or_else(|| Duration::from_secs(60)),
396 };
397
398 let bucket_name = opts.bucket_name.clone();
399
400 let http_client = Arc::new(ReqwestClient::new(ClientConfig {
401 tls_config: state.tls_config.clone(),
402 idle_connection_timeout: state.http_idle_connection_timeout,
403 max_idle_connections_per_host: state.http_max_idle_connections_per_host,
404 tcp_keep_alive_time: state.tcp_keep_alive_time,
405 })?);
406
407 let err_map_component = Arc::new(ErrMapComponent::new());
408
409 let connect_timeout = opts.kv_config.connect_timeout;
410
411 let first_kv_client_configs =
412 Self::gen_first_kv_client_configs(&opts.seed_config.memd_addrs, &state);
413 let first_http_client_configs = Self::gen_first_http_endpoints(
414 user_agent.clone(),
415 &opts.seed_config.http_addrs,
416 &state,
417 );
418 let (first_config, cfg_source_host_port) = Self::get_first_config(
419 user_agent.clone(),
420 first_kv_client_configs,
421 &state,
422 first_http_client_configs,
423 http_client.clone(),
424 err_map_component.clone(),
425 connect_timeout,
426 )
427 .await?;
428
429 state.latest_config = first_config.clone();
430
431 let network_type = if let Some(network) = opts.network {
432 if network == "auto" || network.is_empty() {
433 NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
434 } else {
435 network
436 }
437 } else {
438 NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
439 };
440 info!("Identified network type: {network_type}");
441 state.network_type = network_type;
442
443 let agent_component_configs = AgentInner::gen_agent_component_configs_locked(&state);
444
445 let tracing = Arc::new(TracingComponent::new(
446 agent_component_configs.tracing_config,
447 ));
448
449 let err_map_component_conn_mgr = err_map_component.clone();
450
451 let num_pool_connections = state.num_pool_connections;
452
453 let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel();
454
455 let conn_mgr = Arc::new(
456 StdKvEndpointClientManager::new(KvEndpointClientManagerOptions {
457 on_close_handler: Arc::new(|_manager_id| {}),
458 on_demand_connect: opts.kv_config.on_demand_connect,
459 num_pool_connections,
460 connect_throttle_period: opts.kv_config.connect_throttle_timeout,
461 bootstrap_options: KvClientBootstrapOptions {
462 client_name: user_agent.clone(),
463 disable_error_map: state.disable_error_map,
464 disable_mutation_tokens: state.disable_mutation_tokens,
465 disable_server_durations: state.disable_server_durations,
466 on_err_map_fetched: Some(Arc::new(move |err_map| {
467 err_map_component_conn_mgr.on_err_map(err_map);
468 })),
469 tcp_keep_alive_time: state.tcp_keep_alive_time,
470 auth_mechanisms,
471 connect_timeout,
472 },
473 unsolicited_packet_tx: Some(unsolicited_packet_tx),
474 orphan_handler: opts.orphan_response_handler,
475 endpoints: agent_component_configs.kv_targets,
476 authenticator: opts.authenticator,
477 disable_decompression: opts.compression_config.disable_decompression,
478 selected_bucket: opts.bucket_name,
479 tracing: tracing.clone(),
480 })
481 .await?,
482 );
483
484 let cfg_manager = Arc::new(ConfigManagerMemd::new(
485 agent_component_configs.config_manager_memd_config,
486 ConfigManagerMemdOptions {
487 polling_period: opts.config_poller_config.poll_interval,
488 kv_client_manager: conn_mgr.clone(),
489 first_config,
490 fetch_timeout: opts.config_poller_config.fetch_timeout,
491 },
492 ));
493 let vb_router = Arc::new(StdVbucketRouter::new(
494 agent_component_configs.vbucket_routing_info,
495 VbucketRouterOptions {},
496 ));
497
498 let nmvb_handler = Arc::new(StdNotMyVbucketConfigHandler::new());
499
500 let memd_resolver = CollectionResolverMemd::new(CollectionResolverMemdOptions {
501 conn_mgr: conn_mgr.clone(),
502 });
503
504 let collections = Arc::new(CollectionResolverCached::new(
505 CollectionResolverCachedOptions {
506 resolver: memd_resolver,
507 },
508 ));
509
510 let retry_manager = Arc::new(RetryManager::new(err_map_component.clone()));
511 let compression_manager = Arc::new(CompressionManager::new(opts.compression_config));
512
513 let crud = CrudComponent::new(
514 nmvb_handler.clone(),
515 vb_router.clone(),
516 conn_mgr.clone(),
517 collections.clone(),
518 retry_manager.clone(),
519 compression_manager,
520 );
521
522 let mgmt = MgmtComponent::new(
523 retry_manager.clone(),
524 http_client.clone(),
525 tracing.clone(),
526 agent_component_configs.mgmt_config,
527 MgmtComponentOptions {
528 user_agent: user_agent.clone(),
529 },
530 );
531
532 let analytics = Arc::new(AnalyticsComponent::new(
533 retry_manager.clone(),
534 http_client.clone(),
535 agent_component_configs.analytics_config,
536 AnalyticsComponentOptions {
537 user_agent: user_agent.clone(),
538 },
539 ));
540
541 let query = Arc::new(QueryComponent::new(
542 retry_manager.clone(),
543 http_client.clone(),
544 tracing.clone(),
545 agent_component_configs.query_config,
546 QueryComponentOptions {
547 user_agent: user_agent.clone(),
548 },
549 ));
550
551 let search = Arc::new(SearchComponent::new(
552 retry_manager.clone(),
553 http_client.clone(),
554 tracing.clone(),
555 agent_component_configs.search_config,
556 SearchComponentOptions {
557 user_agent: user_agent.clone(),
558 },
559 ));
560
561 let diagnostics = DiagnosticsComponent::new(
562 conn_mgr.clone(),
563 query.clone(),
564 search.clone(),
565 retry_manager.clone(),
566 agent_component_configs.diagnostics_config,
567 );
568
569 let state = Arc::new(Mutex::new(state));
570
571 let inner = Arc::new(AgentInner {
572 state,
573 bucket: bucket_name,
574 cfg_manager: cfg_manager.clone(),
575 conn_mgr,
576 vb_router,
577 crud,
578 collections,
579 retry_manager,
580 http_client,
581 err_map_component,
582
583 mgmt,
584 analytics,
585 query,
586 search,
587 diagnostics,
588 tracing,
589 });
590
591 let inner_clone = Arc::downgrade(&inner);
592 tokio::spawn(async move {
593 while let Some(packet) = unsolicited_packet_rx.recv().await {
594 if let Some(inner_clone) = inner_clone.upgrade() {
595 inner_clone.unsolicited_packet_handler(packet).await;
596 } else {
597 break;
598 }
599 }
600 debug!("Unsolicited packet handler exited");
601 });
602
603 nmvb_handler.set_watcher(Arc::downgrade(&inner)).await;
604
605 Self::start_config_watcher(Arc::downgrade(&inner), cfg_manager);
606
607 let agent = Agent {
608 inner,
609 user_agent,
610 id: agent_id.clone(),
611 };
612
613 info!("Agent {agent_id} created");
614
615 Ok(agent)
616 }
617
618 pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
621 self.inner.reconfigure(opts).await
622 }
623
624 fn start_config_watcher(
625 inner: Weak<AgentInner>,
626 config_watcher: Arc<impl ConfigManager>,
627 ) -> JoinHandle<()> {
628 let mut watch_rx = config_watcher.watch();
629
630 let inner = inner.clone();
631 tokio::spawn(async move {
632 loop {
633 match watch_rx.changed().await {
634 Ok(_) => {
635 let pc = {
636 watch_rx.borrow_and_update().clone()
640 };
641 if let Some(i) = inner.upgrade() {
642 i.apply_config(pc).await;
643 } else {
644 debug!("Config watcher inner dropped, exiting");
645 return;
646 }
647 }
648 Err(_e) => {
649 debug!("Config watcher channel closed");
650 return;
651 }
652 }
653 }
654 })
655 }
656
657 async fn get_first_config<C: httpx::client::Client>(
658 client_name: String,
659 kv_targets: HashMap<String, KvTarget>,
660 state: &AgentState,
661 http_configs: HashMap<String, FirstHttpConfig>,
662 http_client: Arc<C>,
663 err_map_component: Arc<ErrMapComponent>,
664 connect_timeout: Duration,
665 ) -> Result<(ParsedConfig, String)> {
666 loop {
667 for target in kv_targets.values() {
668 let host = &target.address;
669 let err_map_component_clone = err_map_component.clone();
670 let timeout_result = timeout(
671 connect_timeout,
672 StdKvClient::new(KvClientOptions {
673 address: target.clone(),
674 authenticator: state.authenticator.clone(),
675 selected_bucket: state.bucket.clone(),
676 bootstrap_options: KvClientBootstrapOptions {
677 client_name: client_name.clone(),
678 disable_error_map: state.disable_error_map,
679 disable_mutation_tokens: true,
680 disable_server_durations: true,
681 on_err_map_fetched: Some(Arc::new(move |err_map| {
682 err_map_component_clone.on_err_map(err_map);
683 })),
684 tcp_keep_alive_time: state.tcp_keep_alive_time,
685 auth_mechanisms: state.auth_mechanisms.clone(),
686 connect_timeout,
687 },
688 endpoint_id: "".to_string(),
689 unsolicited_packet_tx: None,
690 orphan_handler: None,
691 on_close_tx: None,
692 disable_decompression: false,
693 id: Uuid::new_v4().to_string(),
694 tracing: Default::default(),
695 }),
696 )
697 .await;
698
699 let client: StdKvClient<Client> = match timeout_result {
700 Ok(client_result) => match client_result {
701 Ok(client) => client,
702 Err(e) => {
703 let mut msg = format!("Failed to connect to endpoint: {e}");
704 if let Some(source) = e.source() {
705 msg = format!("{msg} - {source}");
706 }
707 warn!("{msg}");
708 continue;
709 }
710 },
711 Err(_e) => continue,
712 };
713
714 let raw_config = match client
715 .get_cluster_config(GetClusterConfigRequest {
716 known_version: None,
717 })
718 .await
719 {
720 Ok(resp) => resp.config,
721 Err(_e) => continue,
722 };
723
724 client.close().await?;
725
726 let config: TerseConfig = serde_json::from_slice(&raw_config).map_err(|e| {
727 Error::new_message_error(format!("failed to deserialize config: {e}"))
728 })?;
729
730 match ConfigParser::parse_terse_config(config, host.host.as_str()) {
731 Ok(c) => {
732 return Ok((c, format!("{}:{}", host.host, host.port)));
733 }
734 Err(_e) => continue,
735 };
736 }
737
738 info!("Failed to fetch config over kv, attempting http");
739 for endpoint_config in http_configs.values() {
740 let endpoint = endpoint_config.endpoint.clone();
741 let host_port = get_host_port_from_uri(&endpoint)?;
742 let auth = match &endpoint_config.authenticator {
743 Authenticator::PasswordAuthenticator(authenticator) => {
744 let user_pass =
745 authenticator.get_credentials(&ServiceType::MGMT, host_port.clone())?;
746 Auth::BasicAuth(BasicAuth::new(user_pass.username, user_pass.password))
747 }
748 Authenticator::CertificateAuthenticator(_authenticator) => {
749 Auth::BasicAuth(BasicAuth::new("".to_string(), "".to_string()))
750 }
751 Authenticator::JwtAuthenticator(authenticator) => {
752 Auth::BearerAuth(BearerAuth::new(authenticator.get_token()))
753 }
754 };
755
756 match Self::fetch_http_config(
757 http_client.clone(),
758 endpoint,
759 endpoint_config.user_agent.clone(),
760 auth,
761 endpoint_config.bucket_name.clone(),
762 )
763 .await
764 {
765 Ok(c) => {
766 return Ok((c, host_port));
767 }
768 Err(_e) => {}
769 };
770 }
771
772 info!("Failed to fetch config from any source");
773
774 sleep(Duration::from_secs(1)).await;
776 }
777 }
778
779 pub(crate) async fn fetch_http_config<C: httpx::client::Client>(
780 http_client: Arc<C>,
781 endpoint: String,
782 user_agent: String,
783 auth: Auth,
784 bucket_name: Option<String>,
785 ) -> Result<ParsedConfig> {
786 debug!("Polling config from {}", &endpoint);
787
788 let host_port = get_host_port_from_uri(&endpoint)?;
789 let hostname = get_hostname_from_host_port(&host_port)?;
790
791 let parsed = if let Some(bucket_name) = bucket_name {
792 let config = mgmtx::mgmt::Management {
793 http_client,
794 user_agent,
795 endpoint: endpoint.clone(),
796 canonical_endpoint: endpoint.clone(),
797 auth,
798 tracing: Default::default(),
799 }
800 .get_terse_bucket_config(&GetTerseBucketConfigOptions {
801 bucket_name: &bucket_name,
802 on_behalf_of_info: None,
803 })
804 .await
805 .map_err(Error::from)?;
806
807 ConfigParser::parse_terse_config(config, &hostname)?
808 } else {
809 let config = mgmtx::mgmt::Management {
810 http_client,
811 user_agent,
812 endpoint: endpoint.clone(),
813 canonical_endpoint: endpoint.clone(),
814 auth,
815 tracing: Default::default(),
816 }
817 .get_terse_cluster_config(&GetTerseClusterConfigOptions {
818 on_behalf_of_info: None,
819 })
820 .await
821 .map_err(Error::from)?;
822
823 ConfigParser::parse_terse_config(config, &hostname)?
824 };
825
826 Ok(parsed)
827 }
828
829 fn gen_first_kv_client_configs(
830 memd_addrs: &Vec<Address>,
831 state: &AgentState,
832 ) -> HashMap<String, KvTarget> {
833 let mut clients = HashMap::new();
834 for addr in memd_addrs {
835 let node_id = format!("kv-{addr}");
836 let target = KvTarget {
837 address: addr.clone(),
838 tls_config: state.tls_config.clone(),
839 canonical_address: addr.clone(),
840 };
841 clients.insert(node_id, target);
842 }
843
844 clients
845 }
846
847 fn gen_first_http_endpoints(
848 client_name: String,
849 mgmt_addrs: &Vec<Address>,
850 state: &AgentState,
851 ) -> HashMap<String, FirstHttpConfig> {
852 let mut clients = HashMap::new();
853 for addr in mgmt_addrs {
854 let node_id = format!("mgmt{addr}");
855 let base = if state.tls_config.is_some() {
856 "https"
857 } else {
858 "http"
859 };
860 let config = FirstHttpConfig {
861 endpoint: format!("{base}://{addr}"),
862 tls: state.tls_config.clone(),
863 user_agent: client_name.clone(),
864 authenticator: state.authenticator.clone(),
865 bucket_name: state.bucket.clone(),
866 };
867 clients.insert(node_id, config);
868 }
869
870 clients
871 }
872
873 pub(crate) async fn run_with_bucket_feature_check<T, Fut>(
874 &self,
875 feature: BucketFeature,
876 operation: impl FnOnce() -> Fut,
877 message: impl Into<String>,
878 ) -> Result<T>
879 where
880 Fut: std::future::Future<Output = Result<T>>,
881 {
882 let features = self.bucket_features().await?;
883
884 if !features.contains(&feature) {
885 return Err(Error::new_feature_not_available_error(
886 format!("{feature:?}"),
887 message.into(),
888 ));
889 }
890
891 operation().await
892 }
893
894 pub(crate) fn get_bucket_name(&self) -> Option<String> {
895 self.inner.get_bucket_name()
896 }
897}
898
899struct FirstHttpConfig {
900 pub endpoint: String,
901 pub tls: Option<TlsConfig>,
902 pub user_agent: String,
903 pub authenticator: Authenticator,
904 pub bucket_name: Option<String>,
905}
906
907impl Drop for Agent {
908 fn drop(&mut self) {
909 debug!(
910 "Dropping agent {}, {} strong references remain",
911 self.id,
912 Arc::strong_count(&self.inner)
913 );
914 }
915}