couchbase_core/
ondemand_agentmanager.rs1use std::collections::HashMap;
20use std::sync::{Arc, Weak};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use futures::executor::block_on;
25use tokio::sync::{Mutex, Notify};
26use tracing::{debug, info};
27
28use crate::agent::Agent;
29use crate::auth_mechanism::AuthMechanism;
30use crate::authenticator::Authenticator;
31use crate::error;
32use crate::error::ErrorKind;
33use crate::options::agent::{
34 AgentOptions, CompressionConfig, ConfigPollerConfig, HttpConfig, KvConfig,
35 ReconfigureAgentOptions, SeedConfig,
36};
37use crate::options::ondemand_agentmanager::OnDemandAgentManagerOptions;
38use crate::tls_config::TlsConfig;
39
40pub struct OnDemandAgentManager {
41 opts: OnDemandAgentManagerOptions,
42 cluster_agent: Arc<Agent>,
44 fast_map: ArcSwap<HashMap<String, Weak<Agent>>>,
45 slow_map: Mutex<HashMap<String, Arc<Agent>>>,
46 notif_map: Mutex<HashMap<String, Arc<Notify>>>,
47}
48
49impl OnDemandAgentManager {
50 pub async fn new(opts: OnDemandAgentManagerOptions) -> error::Result<Self> {
51 let cluster_agent = Arc::new(Agent::new(opts.clone().into()).await?);
52
53 Ok(Self {
54 opts,
55 cluster_agent,
56 fast_map: Default::default(),
57 slow_map: Default::default(),
58 notif_map: Default::default(),
59 })
60 }
61
62 pub fn get_cluster_agent(&self) -> Weak<Agent> {
63 Arc::downgrade(&self.cluster_agent)
64 }
65
66 pub async fn get_bucket_agent(
67 &self,
68 bucket_name: impl Into<String>,
69 ) -> error::Result<Weak<Agent>> {
70 let bucket_name = bucket_name.into();
71 loop {
72 let fast_map = self.fast_map.load();
73 if let Some(agent) = fast_map.get(&bucket_name) {
74 return Ok(agent.clone());
75 }
76
77 self.load_bucket_agent_slow(&bucket_name).await?;
78
79 let slow_map = self.slow_map.lock().await;
80 let mut fast_map = HashMap::with_capacity(slow_map.len());
81 for (name, agent) in slow_map.iter() {
82 fast_map.insert(name.clone(), Arc::downgrade(agent));
83 }
84
85 self.fast_map.store(Arc::new(fast_map));
86 }
87 }
88
89 pub async fn reconfigure_agents(&self, opts: ReconfigureAgentOptions) {
90 self.cluster_agent.reconfigure(opts.clone()).await;
91
92 let slow_map = self.slow_map.lock().await;
93 for (bucket_name, agent) in slow_map.iter() {
94 debug!("Reconfiguring agent for bucket {}", bucket_name);
95 agent.reconfigure(opts.clone()).await;
96 }
97 }
98
99 async fn load_bucket_agent_slow(&self, bucket_name: impl Into<String>) -> error::Result<()> {
100 let bucket_name = bucket_name.into();
101 let notif = {
102 let mut slow_map = self.slow_map.lock().await;
103 if slow_map.contains_key(&bucket_name) {
104 return Ok(());
105 }
106
107 debug!(
108 "Bucket {} not in slow map, checking notif map",
109 &bucket_name
110 );
111 let mut notif_map = self.notif_map.lock().await;
114 if let Some(notif) = notif_map.get(&bucket_name) {
115 let notif = notif.clone();
116 drop(slow_map);
117 drop(notif_map);
118
119 debug!(
120 "Bucket {} in notif map, awaiting notification",
121 &bucket_name
122 );
123 notif.notified().await;
124 debug!("Bucket {} received notification", &bucket_name);
125 return Ok(());
126 };
127
128 debug!("Bucket {} not in any map, creating new", &bucket_name);
129 let notif = Arc::new(Notify::new());
130 notif_map.insert(bucket_name.clone(), notif.clone());
131
132 notif
133 };
134
135 let mut opts: AgentOptions = self.opts.clone().into();
136 opts.bucket_name = Some(bucket_name.clone());
137
138 let agent = Arc::new(Agent::new(opts).await?);
139
140 let mut slow_map = self.slow_map.lock().await;
141 slow_map.insert(bucket_name.clone(), agent);
142
143 {
144 let mut notif_map = self.notif_map.lock().await;
146 notif_map.remove(&bucket_name);
147 }
148
149 notif.notify_waiters();
150
151 Ok(())
152 }
153}
154
155impl Drop for OnDemandAgentManager {
156 fn drop(&mut self) {
157 info!("Dropping OnDemandAgentManager");
158 }
159}