1use crate::address::Address;
20use crate::auth_mechanism::AuthMechanism;
21use crate::authenticator::Authenticator;
22use crate::cbconfig::TerseConfig;
23use crate::collection_resolver_cached::{
24 CollectionResolverCached, CollectionResolverCachedOptions,
25};
26use crate::collection_resolver_memd::{CollectionResolverMemd, CollectionResolverMemdOptions};
27use crate::compressionmanager::{CompressionManager, StdCompressor};
28use crate::configmanager::{
29 ConfigManager, ConfigManagerMemd, ConfigManagerMemdConfig, ConfigManagerMemdOptions,
30};
31use crate::configparser::ConfigParser;
32use crate::crudcomponent::CrudComponent;
33use crate::diagnosticscomponent::{DiagnosticsComponent, DiagnosticsComponentConfig};
34use crate::errmapcomponent::ErrMapComponent;
35use crate::error::{Error, ErrorKind, Result};
36use crate::features::BucketFeature;
37use crate::httpcomponent::HttpComponent;
38use crate::httpx::client::{ClientConfig, ReqwestClient};
39use crate::kvclient::{
40 KvClient, KvClientBootstrapOptions, KvClientOptions, StdKvClient, UnsolicitedPacket,
41};
42use crate::kvclient_ops::KvClientOps;
43use crate::kvclientpool::{KvClientPool, KvClientPoolOptions, StdKvClientPool};
44use crate::memdx::client::Client;
45use crate::memdx::opcode::OpCode;
46use crate::memdx::packet::ResponsePacket;
47use crate::memdx::request::GetClusterConfigRequest;
48use crate::mgmtcomponent::{MgmtComponent, MgmtComponentConfig, MgmtComponentOptions};
49use crate::mgmtx::options::{GetTerseBucketConfigOptions, GetTerseClusterConfigOptions};
50use crate::networktypeheuristic::NetworkTypeHeuristic;
51use crate::nmvbhandler::{ConfigUpdater, StdNotMyVbucketConfigHandler};
52use crate::options::agent::{AgentOptions, ReconfigureAgentOptions};
53use crate::parsedconfig::{ParsedConfig, ParsedConfigBucketFeature, ParsedConfigFeature};
54use crate::querycomponent::{QueryComponent, QueryComponentConfig, QueryComponentOptions};
55use crate::retry::RetryManager;
56use crate::searchcomponent::{SearchComponent, SearchComponentConfig, SearchComponentOptions};
57use crate::service_type::ServiceType;
58use crate::tls_config::TlsConfig;
59use crate::util::{get_host_port_from_uri, get_hostname_from_host_port};
60use crate::vbucketrouter::{
61 StdVbucketRouter, VbucketRouter, VbucketRouterOptions, VbucketRoutingInfo,
62};
63use crate::{httpx, mgmtx};
64
65use byteorder::BigEndian;
66use futures::executor::block_on;
67use tracing::{debug, error, info, warn};
68use uuid::Uuid;
69
70use crate::analyticscomponent::{AnalyticsComponent, AnalyticsComponentOptions};
71use crate::componentconfigs::{AgentComponentConfigs, HttpClientConfig};
72use crate::httpx::request::{Auth, BasicAuth, BearerAuth};
73use crate::kvclient_babysitter::{KvTarget, StdKvClientBabysitter};
74use crate::kvendpointclientmanager::{
75 KvEndpointClientManager, KvEndpointClientManagerOptions, StdKvEndpointClientManager,
76};
77use crate::orphan_reporter::OrphanReporter;
78use crate::tracingcomponent::{TracingComponent, TracingComponentConfig};
79use arc_swap::ArcSwap;
80use std::cmp::Ordering;
81use std::collections::HashMap;
82use std::error::Error as StdError;
83use std::fmt::{format, Display};
84use std::io::Cursor;
85use std::net::ToSocketAddrs;
86use std::ops::{Add, Deref};
87use std::sync::{Arc, Weak};
88use std::time::Duration;
89use tokio::io::AsyncReadExt;
90use tokio::net;
91use tokio::runtime::{Handle, Runtime};
92use tokio::sync::broadcast::{Receiver, Sender};
93use tokio::sync::{broadcast, mpsc, Mutex};
94use tokio::task::JoinHandle;
95use tokio::time::{sleep, timeout, timeout_at, Instant};
96
97#[derive(Clone)]
98struct AgentState {
99 bucket: Option<String>,
100 tls_config: Option<TlsConfig>,
101 authenticator: Authenticator,
102 auth_mechanisms: Vec<AuthMechanism>,
103 num_pool_connections: usize,
104 latest_config: ParsedConfig,
106 network_type: String,
107
108 disable_error_map: bool,
109 disable_mutation_tokens: bool,
110 disable_server_durations: bool,
111 kv_connect_timeout: Duration,
112 kv_connect_throttle_timeout: Duration,
113 http_idle_connection_timeout: Duration,
114 http_max_idle_connections_per_host: Option<usize>,
115 tcp_keep_alive_time: Duration,
116}
117
118impl Display for AgentState {
119 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
120 write!(
121 f,
122 "{{ bucket: {:?}, network_type: {}, num_pool_connections: {}, latest_config_rev_id: {}, latest_config_rev_epoch: {}, authenticator: {} }}",
123 self.bucket,
124 self.network_type,
125 self.num_pool_connections,
126 self.latest_config.rev_id,
127 self.latest_config.rev_epoch,
128 self.authenticator,
129
130 )
131 }
132}
133
134type AgentClientManager = StdKvEndpointClientManager<
135 StdKvClientPool<StdKvClientBabysitter<StdKvClient<Client>>, StdKvClient<Client>>,
136 StdKvClient<Client>,
137>;
138type AgentCollectionResolver = CollectionResolverCached<CollectionResolverMemd<AgentClientManager>>;
139
140pub(crate) struct AgentInner {
141 state: Arc<Mutex<AgentState>>,
142 bucket: Option<String>,
143
144 cfg_manager: Arc<ConfigManagerMemd<AgentClientManager>>,
145 conn_mgr: Arc<AgentClientManager>,
146 vb_router: Arc<StdVbucketRouter>,
147 collections: Arc<AgentCollectionResolver>,
148 retry_manager: Arc<RetryManager>,
149 http_client: Arc<ReqwestClient>,
150 err_map_component: Arc<ErrMapComponent>,
151
152 pub(crate) crud: CrudComponent<
153 AgentClientManager,
154 StdVbucketRouter,
155 StdNotMyVbucketConfigHandler<AgentInner>,
156 AgentCollectionResolver,
157 StdCompressor,
158 >,
159
160 pub(crate) analytics: Arc<AnalyticsComponent<ReqwestClient>>,
161 pub(crate) query: Arc<QueryComponent<ReqwestClient>>,
162 pub(crate) search: Arc<SearchComponent<ReqwestClient>>,
163 pub(crate) mgmt: MgmtComponent<ReqwestClient>,
164 pub(crate) diagnostics: DiagnosticsComponent<ReqwestClient, AgentClientManager>,
165 pub(crate) tracing: Arc<TracingComponent>,
166}
167
168pub struct Agent {
169 pub(crate) inner: Arc<AgentInner>,
170 user_agent: String,
171 id: String,
172}
173
174impl AgentInner {
175 fn gen_agent_component_configs_locked(state: &AgentState) -> AgentComponentConfigs {
176 AgentComponentConfigs::gen_from_config(
177 &state.latest_config,
178 &state.network_type,
179 state.tls_config.clone(),
180 state.bucket.clone(),
181 state.authenticator.clone(),
182 )
183 }
184
185 pub async fn unsolicited_packet_handler(&self, up: UnsolicitedPacket) {
186 let packet = up.packet;
187 if packet.op_code == OpCode::Set {
188 if let Some(ref extras) = packet.extras {
189 if extras.len() < 16 {
190 warn!("Received Set packet with too short extras: {packet:?}");
191 return;
192 }
193
194 let mut cursor = Cursor::new(extras);
195 let server_rev_epoch = cursor.read_i64().await.unwrap();
196 let server_rev_id = cursor.read_i64().await.unwrap();
197
198 if let Some(config) = self
199 .cfg_manager
200 .out_of_band_version(server_rev_id, server_rev_epoch, up.endpoint_id)
201 .await
202 {
203 self.apply_config(config).await;
204 }
205 } else {
206 warn!("Received Set packet with no extras: {packet:?}");
207 }
208 }
209 }
210
211 pub async fn apply_config(&self, config: ParsedConfig) {
212 let mut state = self.state.lock().await;
213
214 info!(
215 "Agent applying updated config: rev_id={rev_id}, rev_epoch={rev_epoch}",
216 rev_id = config.rev_id,
217 rev_epoch = config.rev_epoch
218 );
219 state.latest_config = config;
220
221 self.update_state_locked(&mut state).await;
222 }
223
224 async fn update_state_locked(&self, state: &mut AgentState) {
225 debug!("Agent updating state {}", state);
226
227 let agent_component_configs = Self::gen_agent_component_configs_locked(state);
228
229 if let Err(e) = self
237 .conn_mgr
238 .update_endpoints(agent_component_configs.kv_targets.clone(), true)
239 .await
240 {
241 error!("Failed to reconfigure connection manager (add-only); {e}");
242 };
243
244 self.vb_router
245 .update_vbucket_info(agent_component_configs.vbucket_routing_info);
246
247 if let Err(e) = self
248 .cfg_manager
249 .reconfigure(agent_component_configs.config_manager_memd_config)
250 {
251 error!("Failed to reconfigure memd config watcher component; {e}");
252 }
253
254 if let Err(e) = self
255 .conn_mgr
256 .update_endpoints(agent_component_configs.kv_targets, false)
257 .await
258 {
259 error!("Failed to reconfigure connection manager; {e}");
260 }
261
262 self.analytics
263 .reconfigure(agent_component_configs.analytics_config);
264 self.query.reconfigure(agent_component_configs.query_config);
265 self.search
266 .reconfigure(agent_component_configs.search_config);
267 self.mgmt.reconfigure(agent_component_configs.mgmt_config);
268 self.diagnostics
269 .reconfigure(agent_component_configs.diagnostics_config);
270 self.tracing
271 .reconfigure(agent_component_configs.tracing_config);
272 }
273
274 pub async fn bucket_features(&self) -> Result<Vec<BucketFeature>> {
275 let guard = self.state.lock().await;
276
277 if let Some(bucket) = &guard.latest_config.bucket {
278 let mut features = vec![];
279
280 for feature in &bucket.features {
281 match feature {
282 ParsedConfigBucketFeature::CreateAsDeleted => {
283 features.push(BucketFeature::CreateAsDeleted)
284 }
285 ParsedConfigBucketFeature::ReplaceBodyWithXattr => {
286 features.push(BucketFeature::ReplaceBodyWithXattr)
287 }
288 ParsedConfigBucketFeature::RangeScan => features.push(BucketFeature::RangeScan),
289 ParsedConfigBucketFeature::ReplicaRead => {
290 features.push(BucketFeature::ReplicaRead)
291 }
292 ParsedConfigBucketFeature::NonDedupedHistory => {
293 features.push(BucketFeature::NonDedupedHistory)
294 }
295 ParsedConfigBucketFeature::ReviveDocument => {
296 features.push(BucketFeature::ReviveDocument)
297 }
298 _ => {}
299 }
300 }
301
302 return Ok(features);
303 }
304
305 Err(ErrorKind::NoBucket.into())
306 }
307
308 pub(crate) fn get_bucket_name(&self) -> Option<String> {
309 self.bucket.clone()
310 }
311
312 pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
313 let mut state = self.state.lock().await;
314 state.tls_config = opts.tls_config.clone();
315 state.authenticator = opts.authenticator.clone();
316
317 match self
319 .http_client
320 .update_tls(httpx::client::UpdateTlsOptions {
321 tls_config: opts.tls_config,
322 }) {
323 Ok(_) => {}
324 Err(e) => {
325 warn!("Failed to update TLS for HTTP client: {}", e);
326 }
327 };
328
329 self.conn_mgr.update_auth(opts.authenticator).await;
330
331 self.update_state_locked(&mut state).await;
332 }
333}
334
335impl ConfigUpdater for AgentInner {
336 async fn apply_terse_config(&self, config: TerseConfig, source_hostname: &str) {
337 let parsed_config = match ConfigParser::parse_terse_config(config, source_hostname) {
338 Ok(cfg) => cfg,
339 Err(_e) => {
340 return;
342 }
343 };
344
345 if let Some(config) = self.cfg_manager.out_of_band_config(parsed_config) {
346 self.apply_config(config).await;
347 };
348 }
349}
350
351impl Agent {
352 pub async fn new(opts: AgentOptions) -> Result<Self> {
353 let build_version = env!("CARGO_PKG_VERSION");
354 let user_agent = format!("cb-rust/{build_version}");
355 let agent_id = Uuid::new_v4().to_string();
356 info!(
357 "Core SDK Version: {} - Agent ID: {}",
358 &user_agent, &agent_id
359 );
360 info!("Agent Options {opts}");
361
362 let auth_mechanisms = if !opts.auth_mechanisms.is_empty() {
363 if opts.tls_config.is_none() && opts.auth_mechanisms.contains(&AuthMechanism::Plain) {
364 warn!("PLAIN sends credentials in plaintext, this will cause credential leakage on the network");
365 } else if opts.tls_config.is_some()
366 && (opts.auth_mechanisms.contains(&AuthMechanism::ScramSha512)
367 || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha256)
368 || opts.auth_mechanisms.contains(&AuthMechanism::ScramSha1))
369 {
370 warn!("Consider using PLAIN for TLS connections, as it is more efficient");
371 }
372
373 opts.auth_mechanisms
374 } else {
375 vec![]
376 };
377
378 let mut state = AgentState {
379 bucket: opts.bucket_name.clone(),
380 authenticator: opts.authenticator.clone(),
381 num_pool_connections: opts.kv_config.num_connections,
382 latest_config: ParsedConfig::default(),
383 network_type: "".to_string(),
384 tls_config: opts.tls_config,
385 auth_mechanisms: auth_mechanisms.clone(),
386 disable_error_map: !opts.kv_config.enable_error_map,
387 disable_mutation_tokens: !opts.kv_config.enable_mutation_tokens,
388 disable_server_durations: !opts.kv_config.enable_server_durations,
389 kv_connect_timeout: opts.kv_config.connect_timeout,
390 kv_connect_throttle_timeout: opts.kv_config.connect_throttle_timeout,
391 http_idle_connection_timeout: opts.http_config.idle_connection_timeout,
392 http_max_idle_connections_per_host: opts.http_config.max_idle_connections_per_host,
393 tcp_keep_alive_time: opts
394 .tcp_keep_alive_time
395 .unwrap_or_else(|| Duration::from_secs(60)),
396 };
397
398 let bucket_name = opts.bucket_name.clone();
399
400 let http_client = Arc::new(ReqwestClient::new(ClientConfig {
401 tls_config: state.tls_config.clone(),
402 idle_connection_timeout: state.http_idle_connection_timeout,
403 max_idle_connections_per_host: state.http_max_idle_connections_per_host,
404 tcp_keep_alive_time: state.tcp_keep_alive_time,
405 })?);
406
407 let err_map_component = Arc::new(ErrMapComponent::new());
408
409 let connect_timeout = opts.kv_config.connect_timeout;
410
411 let first_kv_client_configs =
412 Self::gen_first_kv_client_configs(&opts.seed_config.memd_addrs, &state);
413 let first_http_client_configs = Self::gen_first_http_endpoints(
414 user_agent.clone(),
415 &opts.seed_config.http_addrs,
416 &state,
417 );
418 let (first_config, cfg_source_host_port) = Self::get_first_config(
419 user_agent.clone(),
420 first_kv_client_configs,
421 &state,
422 first_http_client_configs,
423 http_client.clone(),
424 err_map_component.clone(),
425 connect_timeout,
426 )
427 .await?;
428
429 state.latest_config = first_config.clone();
430
431 let network_type = if let Some(network) = opts.network {
432 if network == "auto" || network.is_empty() {
433 NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
434 } else {
435 network
436 }
437 } else {
438 NetworkTypeHeuristic::identify(&state.latest_config, &cfg_source_host_port)
439 };
440 info!(
441 "Agent {} identified network type: {network_type}",
442 &agent_id
443 );
444 state.network_type = network_type;
445
446 let agent_component_configs = AgentInner::gen_agent_component_configs_locked(&state);
447
448 let tracing = Arc::new(TracingComponent::new(
449 agent_component_configs.tracing_config,
450 ));
451
452 let err_map_component_conn_mgr = err_map_component.clone();
453
454 let num_pool_connections = state.num_pool_connections;
455
456 let (unsolicited_packet_tx, mut unsolicited_packet_rx) = mpsc::unbounded_channel();
457
458 let conn_mgr_id = Uuid::new_v4().to_string();
459 info!(
460 "Agent {} creating kv endpoint client manager {}",
461 &agent_id, &conn_mgr_id
462 );
463 let conn_mgr = Arc::new(
464 StdKvEndpointClientManager::new(KvEndpointClientManagerOptions {
465 id: conn_mgr_id,
466 on_close_handler: Arc::new(|_manager_id| {}),
467 on_demand_connect: opts.kv_config.on_demand_connect,
468 num_pool_connections,
469 connect_throttle_period: opts.kv_config.connect_throttle_timeout,
470 bootstrap_options: KvClientBootstrapOptions {
471 client_name: user_agent.clone(),
472 disable_error_map: state.disable_error_map,
473 disable_mutation_tokens: state.disable_mutation_tokens,
474 disable_server_durations: state.disable_server_durations,
475 on_err_map_fetched: Some(Arc::new(move |err_map| {
476 err_map_component_conn_mgr.on_err_map(err_map);
477 })),
478 tcp_keep_alive_time: state.tcp_keep_alive_time,
479 auth_mechanisms,
480 connect_timeout,
481 },
482 unsolicited_packet_tx: Some(unsolicited_packet_tx),
483 orphan_handler: opts.orphan_response_handler,
484 endpoints: agent_component_configs.kv_targets,
485 authenticator: opts.authenticator,
486 disable_decompression: opts.compression_config.disable_decompression,
487 selected_bucket: opts.bucket_name,
488 tracing: tracing.clone(),
489 })
490 .await?,
491 );
492
493 let cfg_manager = Arc::new(ConfigManagerMemd::new(
494 agent_component_configs.config_manager_memd_config,
495 ConfigManagerMemdOptions {
496 polling_period: opts.config_poller_config.poll_interval,
497 kv_client_manager: conn_mgr.clone(),
498 first_config,
499 fetch_timeout: opts.config_poller_config.fetch_timeout,
500 },
501 ));
502 let vb_router = Arc::new(StdVbucketRouter::new(
503 agent_component_configs.vbucket_routing_info,
504 VbucketRouterOptions {},
505 ));
506
507 let nmvb_handler = Arc::new(StdNotMyVbucketConfigHandler::new());
508
509 let memd_resolver = CollectionResolverMemd::new(CollectionResolverMemdOptions {
510 conn_mgr: conn_mgr.clone(),
511 });
512
513 let collections = Arc::new(CollectionResolverCached::new(
514 CollectionResolverCachedOptions {
515 resolver: memd_resolver,
516 },
517 ));
518
519 let retry_manager = Arc::new(RetryManager::new(err_map_component.clone()));
520 let compression_manager = Arc::new(CompressionManager::new(opts.compression_config));
521
522 let crud = CrudComponent::new(
523 nmvb_handler.clone(),
524 vb_router.clone(),
525 conn_mgr.clone(),
526 collections.clone(),
527 retry_manager.clone(),
528 compression_manager,
529 );
530
531 let mgmt_id = Uuid::new_v4().to_string();
532 info!("Agent {} creating mgmt component {}", &agent_id, &mgmt_id);
533 let mgmt = MgmtComponent::new(
534 retry_manager.clone(),
535 http_client.clone(),
536 tracing.clone(),
537 agent_component_configs.mgmt_config,
538 MgmtComponentOptions {
539 id: mgmt_id,
540 user_agent: user_agent.clone(),
541 },
542 );
543
544 let analytics_id = Uuid::new_v4().to_string();
545 info!(
546 "Agent {} creating analytics component {}",
547 &agent_id, &analytics_id
548 );
549 let analytics = Arc::new(AnalyticsComponent::new(
550 retry_manager.clone(),
551 http_client.clone(),
552 agent_component_configs.analytics_config,
553 AnalyticsComponentOptions {
554 id: analytics_id,
555 user_agent: user_agent.clone(),
556 },
557 ));
558
559 let query_id = Uuid::new_v4().to_string();
560 info!("Agent {} creating query component {}", &agent_id, &query_id);
561 let query = Arc::new(QueryComponent::new(
562 retry_manager.clone(),
563 http_client.clone(),
564 tracing.clone(),
565 agent_component_configs.query_config,
566 QueryComponentOptions {
567 id: query_id,
568 user_agent: user_agent.clone(),
569 },
570 ));
571
572 let search_id = Uuid::new_v4().to_string();
573 info!(
574 "Agent {} creating search component {}",
575 &agent_id, &search_id
576 );
577 let search = Arc::new(SearchComponent::new(
578 retry_manager.clone(),
579 http_client.clone(),
580 tracing.clone(),
581 agent_component_configs.search_config,
582 SearchComponentOptions {
583 id: search_id,
584 user_agent: user_agent.clone(),
585 },
586 ));
587
588 let diagnostics = DiagnosticsComponent::new(
589 conn_mgr.clone(),
590 query.clone(),
591 search.clone(),
592 retry_manager.clone(),
593 agent_component_configs.diagnostics_config,
594 );
595
596 let state = Arc::new(Mutex::new(state));
597
598 let inner = Arc::new(AgentInner {
599 state,
600 bucket: bucket_name,
601 cfg_manager: cfg_manager.clone(),
602 conn_mgr,
603 vb_router,
604 crud,
605 collections,
606 retry_manager,
607 http_client,
608 err_map_component,
609
610 mgmt,
611 analytics,
612 query,
613 search,
614 diagnostics,
615 tracing,
616 });
617
618 let inner_clone = Arc::downgrade(&inner);
619 tokio::spawn(async move {
620 while let Some(packet) = unsolicited_packet_rx.recv().await {
621 if let Some(inner_clone) = inner_clone.upgrade() {
622 inner_clone.unsolicited_packet_handler(packet).await;
623 } else {
624 break;
625 }
626 }
627 debug!("Unsolicited packet handler exited");
628 });
629
630 nmvb_handler.set_watcher(Arc::downgrade(&inner)).await;
631
632 Self::start_config_watcher(Arc::downgrade(&inner), cfg_manager);
633
634 let agent = Agent {
635 inner,
636 user_agent,
637 id: agent_id.clone(),
638 };
639
640 info!("Agent {agent_id} created");
641
642 Ok(agent)
643 }
644
645 pub async fn reconfigure(&self, opts: ReconfigureAgentOptions) {
648 self.inner.reconfigure(opts).await
649 }
650
651 fn start_config_watcher(
652 inner: Weak<AgentInner>,
653 config_watcher: Arc<impl ConfigManager>,
654 ) -> JoinHandle<()> {
655 let mut watch_rx = config_watcher.watch();
656
657 let inner = inner.clone();
658 tokio::spawn(async move {
659 loop {
660 match watch_rx.changed().await {
661 Ok(_) => {
662 let pc = {
663 watch_rx.borrow_and_update().clone()
667 };
668 if let Some(i) = inner.upgrade() {
669 i.apply_config(pc).await;
670 } else {
671 debug!("Config watcher inner dropped, exiting");
672 return;
673 }
674 }
675 Err(_e) => {
676 debug!("Config watcher channel closed");
677 return;
678 }
679 }
680 }
681 })
682 }
683
684 async fn get_first_config<C: httpx::client::Client>(
685 client_name: String,
686 kv_targets: HashMap<String, KvTarget>,
687 state: &AgentState,
688 http_configs: HashMap<String, FirstHttpConfig>,
689 http_client: Arc<C>,
690 err_map_component: Arc<ErrMapComponent>,
691 connect_timeout: Duration,
692 ) -> Result<(ParsedConfig, String)> {
693 loop {
694 for target in kv_targets.values() {
695 let host = &target.address;
696 let err_map_component_clone = err_map_component.clone();
697 let timeout_result = timeout(
698 connect_timeout,
699 StdKvClient::new(KvClientOptions {
700 address: target.clone(),
701 authenticator: state.authenticator.clone(),
702 selected_bucket: state.bucket.clone(),
703 bootstrap_options: KvClientBootstrapOptions {
704 client_name: client_name.clone(),
705 disable_error_map: state.disable_error_map,
706 disable_mutation_tokens: true,
707 disable_server_durations: true,
708 on_err_map_fetched: Some(Arc::new(move |err_map| {
709 err_map_component_clone.on_err_map(err_map);
710 })),
711 tcp_keep_alive_time: state.tcp_keep_alive_time,
712 auth_mechanisms: state.auth_mechanisms.clone(),
713 connect_timeout,
714 },
715 endpoint_id: "".to_string(),
716 unsolicited_packet_tx: None,
717 orphan_handler: None,
718 on_close_tx: None,
719 disable_decompression: false,
720 id: Uuid::new_v4().to_string(),
721 tracing: Default::default(),
722 }),
723 )
724 .await;
725
726 let client: StdKvClient<Client> = match timeout_result {
727 Ok(client_result) => match client_result {
728 Ok(client) => client,
729 Err(e) => {
730 let mut msg = format!("Failed to connect to endpoint: {e}");
731 if let Some(source) = e.source() {
732 msg = format!("{msg} - {source}");
733 }
734 warn!("{msg}");
735 continue;
736 }
737 },
738 Err(_e) => continue,
739 };
740
741 let raw_config = match client
742 .get_cluster_config(GetClusterConfigRequest {
743 known_version: None,
744 })
745 .await
746 {
747 Ok(resp) => resp.config,
748 Err(_e) => continue,
749 };
750
751 client.close().await?;
752
753 let config: TerseConfig = serde_json::from_slice(&raw_config).map_err(|e| {
754 Error::new_message_error(format!("failed to deserialize config: {e}"))
755 })?;
756
757 match ConfigParser::parse_terse_config(config, host.host.as_str()) {
758 Ok(c) => {
759 return Ok((c, format!("{}:{}", host.host, host.port)));
760 }
761 Err(_e) => continue,
762 };
763 }
764
765 info!("Failed to fetch config over kv, attempting http");
766 for endpoint_config in http_configs.values() {
767 let endpoint = endpoint_config.endpoint.clone();
768 let host_port = get_host_port_from_uri(&endpoint)?;
769 let auth = match &endpoint_config.authenticator {
770 Authenticator::PasswordAuthenticator(authenticator) => {
771 let user_pass =
772 authenticator.get_credentials(&ServiceType::MGMT, host_port.clone())?;
773 Auth::BasicAuth(BasicAuth::new(user_pass.username, user_pass.password))
774 }
775 Authenticator::CertificateAuthenticator(_authenticator) => {
776 Auth::BasicAuth(BasicAuth::new("".to_string(), "".to_string()))
777 }
778 Authenticator::JwtAuthenticator(authenticator) => {
779 Auth::BearerAuth(BearerAuth::new(authenticator.get_token()))
780 }
781 };
782
783 match Self::fetch_http_config(
784 http_client.clone(),
785 endpoint,
786 endpoint_config.user_agent.clone(),
787 auth,
788 endpoint_config.bucket_name.clone(),
789 )
790 .await
791 {
792 Ok(c) => {
793 return Ok((c, host_port));
794 }
795 Err(_e) => {}
796 };
797 }
798
799 info!("Failed to fetch config from any source");
800
801 sleep(Duration::from_secs(1)).await;
803 }
804 }
805
806 pub(crate) async fn fetch_http_config<C: httpx::client::Client>(
807 http_client: Arc<C>,
808 endpoint: String,
809 user_agent: String,
810 auth: Auth,
811 bucket_name: Option<String>,
812 ) -> Result<ParsedConfig> {
813 debug!("Polling config from {}", &endpoint);
814
815 let host_port = get_host_port_from_uri(&endpoint)?;
816 let hostname = get_hostname_from_host_port(&host_port)?;
817
818 let parsed = if let Some(bucket_name) = bucket_name {
819 let config = mgmtx::mgmt::Management {
820 http_client,
821 user_agent,
822 endpoint: endpoint.clone(),
823 canonical_endpoint: endpoint.clone(),
824 auth,
825 tracing: Default::default(),
826 }
827 .get_terse_bucket_config(&GetTerseBucketConfigOptions {
828 bucket_name: &bucket_name,
829 on_behalf_of_info: None,
830 })
831 .await
832 .map_err(Error::from)?;
833
834 ConfigParser::parse_terse_config(config, &hostname)?
835 } else {
836 let config = mgmtx::mgmt::Management {
837 http_client,
838 user_agent,
839 endpoint: endpoint.clone(),
840 canonical_endpoint: endpoint.clone(),
841 auth,
842 tracing: Default::default(),
843 }
844 .get_terse_cluster_config(&GetTerseClusterConfigOptions {
845 on_behalf_of_info: None,
846 })
847 .await
848 .map_err(Error::from)?;
849
850 ConfigParser::parse_terse_config(config, &hostname)?
851 };
852
853 Ok(parsed)
854 }
855
856 fn gen_first_kv_client_configs(
857 memd_addrs: &Vec<Address>,
858 state: &AgentState,
859 ) -> HashMap<String, KvTarget> {
860 let mut clients = HashMap::new();
861 for addr in memd_addrs {
862 let node_id = format!("kv-{addr}");
863 let target = KvTarget {
864 address: addr.clone(),
865 tls_config: state.tls_config.clone(),
866 canonical_address: addr.clone(),
867 };
868 clients.insert(node_id, target);
869 }
870
871 clients
872 }
873
874 fn gen_first_http_endpoints(
875 client_name: String,
876 mgmt_addrs: &Vec<Address>,
877 state: &AgentState,
878 ) -> HashMap<String, FirstHttpConfig> {
879 let mut clients = HashMap::new();
880 for addr in mgmt_addrs {
881 let node_id = format!("mgmt{addr}");
882 let base = if state.tls_config.is_some() {
883 "https"
884 } else {
885 "http"
886 };
887 let config = FirstHttpConfig {
888 endpoint: format!("{base}://{addr}"),
889 tls: state.tls_config.clone(),
890 user_agent: client_name.clone(),
891 authenticator: state.authenticator.clone(),
892 bucket_name: state.bucket.clone(),
893 };
894 clients.insert(node_id, config);
895 }
896
897 clients
898 }
899
900 pub(crate) async fn run_with_bucket_feature_check<T, Fut>(
901 &self,
902 feature: BucketFeature,
903 operation: impl FnOnce() -> Fut,
904 message: impl Into<String>,
905 ) -> Result<T>
906 where
907 Fut: std::future::Future<Output = Result<T>>,
908 {
909 let features = self.bucket_features().await?;
910
911 if !features.contains(&feature) {
912 return Err(Error::new_feature_not_available_error(
913 format!("{feature:?}"),
914 message.into(),
915 ));
916 }
917
918 operation().await
919 }
920
921 pub(crate) fn get_bucket_name(&self) -> Option<String> {
922 self.inner.get_bucket_name()
923 }
924}
925
926struct FirstHttpConfig {
927 pub endpoint: String,
928 pub tls: Option<TlsConfig>,
929 pub user_agent: String,
930 pub authenticator: Authenticator,
931 pub bucket_name: Option<String>,
932}
933
934impl Drop for Agent {
935 fn drop(&mut self) {
936 debug!(
937 "Dropping agent {}, {} strong references remain",
938 self.id,
939 Arc::strong_count(&self.inner)
940 );
941 }
942}