Skip to main content

couchbase_core/
ondemand_agentmanager.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use std::collections::HashMap;
20use std::sync::{Arc, Weak};
21use std::time::Duration;
22
23use arc_swap::ArcSwap;
24use futures::executor::block_on;
25use tokio::sync::{Mutex, Notify};
26use tracing::{debug, info};
27
28use crate::agent::Agent;
29use crate::auth_mechanism::AuthMechanism;
30use crate::authenticator::Authenticator;
31use crate::error;
32use crate::error::ErrorKind;
33use crate::options::agent::{
34    AgentOptions, CompressionConfig, ConfigPollerConfig, HttpConfig, KvConfig,
35    ReconfigureAgentOptions, SeedConfig,
36};
37use crate::options::ondemand_agentmanager::OnDemandAgentManagerOptions;
38use crate::tls_config::TlsConfig;
39
40pub struct OnDemandAgentManager {
41    opts: OnDemandAgentManagerOptions,
42    // This is Arc so that we can provide a consistent API for handing out agents.
43    cluster_agent: Arc<Agent>,
44    fast_map: ArcSwap<HashMap<String, Weak<Agent>>>,
45    slow_map: Mutex<HashMap<String, Arc<Agent>>>,
46    notif_map: Mutex<HashMap<String, Arc<Notify>>>,
47}
48
49impl OnDemandAgentManager {
50    pub async fn new(opts: OnDemandAgentManagerOptions) -> error::Result<Self> {
51        let cluster_agent = Arc::new(Agent::new(opts.clone().into()).await?);
52
53        Ok(Self {
54            opts,
55            cluster_agent,
56            fast_map: Default::default(),
57            slow_map: Default::default(),
58            notif_map: Default::default(),
59        })
60    }
61
62    pub fn get_cluster_agent(&self) -> Weak<Agent> {
63        Arc::downgrade(&self.cluster_agent)
64    }
65
66    pub async fn get_bucket_agent(
67        &self,
68        bucket_name: impl Into<String>,
69    ) -> error::Result<Weak<Agent>> {
70        let bucket_name = bucket_name.into();
71        loop {
72            let fast_map = self.fast_map.load();
73            if let Some(agent) = fast_map.get(&bucket_name) {
74                return Ok(agent.clone());
75            }
76
77            self.load_bucket_agent_slow(&bucket_name).await?;
78
79            let slow_map = self.slow_map.lock().await;
80            let mut fast_map = HashMap::with_capacity(slow_map.len());
81            for (name, agent) in slow_map.iter() {
82                fast_map.insert(name.clone(), Arc::downgrade(agent));
83            }
84
85            self.fast_map.store(Arc::new(fast_map));
86        }
87    }
88
89    pub async fn reconfigure_agents(&self, opts: ReconfigureAgentOptions) {
90        self.cluster_agent.reconfigure(opts.clone()).await;
91
92        let slow_map = self.slow_map.lock().await;
93        for (bucket_name, agent) in slow_map.iter() {
94            debug!("Reconfiguring agent for bucket {}", bucket_name);
95            agent.reconfigure(opts.clone()).await;
96        }
97    }
98
99    async fn load_bucket_agent_slow(&self, bucket_name: impl Into<String>) -> error::Result<()> {
100        let bucket_name = bucket_name.into();
101        let notif = {
102            let mut slow_map = self.slow_map.lock().await;
103            if slow_map.contains_key(&bucket_name) {
104                return Ok(());
105            }
106
107            debug!(
108                "Bucket {} not in slow map, checking notif map",
109                &bucket_name
110            );
111            // If we don't have an agent then check the notif map to see if someone else is getting
112            // an agent already. Note that we're still inside the slow_map lock here.
113            let mut notif_map = self.notif_map.lock().await;
114            if let Some(notif) = notif_map.get(&bucket_name) {
115                let notif = notif.clone();
116                drop(slow_map);
117                drop(notif_map);
118
119                debug!(
120                    "Bucket {} in notif map, awaiting notification",
121                    &bucket_name
122                );
123                notif.notified().await;
124                debug!("Bucket {} received notification", &bucket_name);
125                return Ok(());
126            };
127
128            debug!("Bucket {} not in any map, creating new", &bucket_name);
129            let notif = Arc::new(Notify::new());
130            notif_map.insert(bucket_name.clone(), notif.clone());
131
132            notif
133        };
134
135        let mut opts: AgentOptions = self.opts.clone().into();
136        opts.bucket_name = Some(bucket_name.clone());
137
138        let agent = Arc::new(Agent::new(opts).await?);
139
140        let mut slow_map = self.slow_map.lock().await;
141        slow_map.insert(bucket_name.clone(), agent);
142
143        {
144            // We remove the entry from the notif map, whilst still under the slow_map lock.
145            let mut notif_map = self.notif_map.lock().await;
146            notif_map.remove(&bucket_name);
147        }
148
149        notif.notify_waiters();
150
151        Ok(())
152    }
153}
154
155impl Drop for OnDemandAgentManager {
156    fn drop(&mut self) {
157        info!("Dropping OnDemandAgentManager");
158    }
159}