Skip to main content

couchbase_core/mgmtx/
mgmt_bucket.rs

1/*
2 *
3 *  * Copyright (c) 2025 Couchbase, Inc.
4 *  *
5 *  * Licensed under the Apache License, Version 2.0 (the "License");
6 *  * you may not use this file except in compliance with the License.
7 *  * You may obtain a copy of the License at
8 *  *
9 *  *    http://www.apache.org/licenses/LICENSE-2.0
10 *  *
11 *  * Unless required by applicable law or agreed to in writing, software
12 *  * distributed under the License is distributed on an "AS IS" BASIS,
13 *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14 *  * See the License for the specific language governing permissions and
15 *  * limitations under the License.
16 *
17 */
18
19use crate::httpx::client::Client;
20use crate::mgmtx::bucket_settings::{encode_bucket_settings, BucketDef};
21use crate::mgmtx::bucket_settings_json::BucketSettingsJson;
22use crate::mgmtx::error;
23use crate::mgmtx::mgmt::{parse_response_json, Management};
24use crate::mgmtx::options::{
25    CreateBucketOptions, DeleteBucketOptions, FlushBucketOptions, GetAllBucketsOptions,
26    GetBucketOptions, UpdateBucketOptions,
27};
28use crate::tracingcomponent::{BeginDispatchFields, EndDispatchFields};
29use crate::util::get_host_port_tuple_from_uri;
30use bytes::Bytes;
31use http::Method;
32
33impl<C: Client> Management<C> {
34    pub async fn get_all_buckets(
35        &self,
36        opts: &GetAllBucketsOptions<'_>,
37    ) -> error::Result<Vec<BucketDef>> {
38        let method = Method::GET;
39        let path = "pools/default/buckets".to_string();
40
41        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
42        let canonical_addr =
43            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
44        let resp = self
45            .tracing
46            .orchestrate_dispatch_span(
47                BeginDispatchFields::new(
48                    (&peer_addr.0, &peer_addr.1),
49                    (&canonical_addr.0, &canonical_addr.1),
50                    None,
51                ),
52                self.execute(
53                    method.clone(),
54                    &path,
55                    "",
56                    opts.on_behalf_of_info.cloned(),
57                    None,
58                    None,
59                ),
60                |_| EndDispatchFields::new(None, None),
61            )
62            .await?;
63        if resp.status() != 200 {
64            return Err(Self::decode_common_error(method, path, "get_all_buckets", resp).await);
65        }
66
67        let json_buckets: Vec<BucketSettingsJson> = parse_response_json(resp).await?;
68        let mut buckets = Vec::with_capacity(json_buckets.len());
69        for bucket in json_buckets {
70            buckets.push(bucket.into());
71        }
72
73        Ok(buckets)
74    }
75
76    pub async fn get_bucket(&self, opts: &GetBucketOptions<'_>) -> error::Result<BucketDef> {
77        let method = Method::GET;
78        let path = format!("pools/default/buckets/{}", opts.bucket_name).to_string();
79
80        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
81        let canonical_addr =
82            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
83        let resp = self
84            .tracing
85            .orchestrate_dispatch_span(
86                BeginDispatchFields::new(
87                    (&peer_addr.0, &peer_addr.1),
88                    (&canonical_addr.0, &canonical_addr.1),
89                    None,
90                ),
91                self.execute(
92                    method.clone(),
93                    &path,
94                    "",
95                    opts.on_behalf_of_info.cloned(),
96                    None,
97                    None,
98                ),
99                |_| EndDispatchFields::new(None, None),
100            )
101            .await?;
102
103        if resp.status() != 200 {
104            return Err(Self::decode_common_error(method, path, "get_bucket", resp).await);
105        }
106
107        let bucket: BucketSettingsJson = parse_response_json(resp).await?;
108
109        Ok(bucket.into())
110    }
111
112    pub async fn create_bucket(&self, opts: &CreateBucketOptions<'_>) -> error::Result<()> {
113        let method = Method::POST;
114        let path = "pools/default/buckets".to_string();
115
116        let body = {
117            // Serializer is not Send so we need to drop it before making the request.
118            let mut form = url::form_urlencoded::Serializer::new(String::new());
119            form.append_pair("name", opts.bucket_name);
120            encode_bucket_settings(&mut form, opts.bucket_settings);
121
122            Bytes::from(form.finish())
123        };
124
125        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
126        let canonical_addr =
127            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
128        let resp = self
129            .tracing
130            .orchestrate_dispatch_span(
131                BeginDispatchFields::new(
132                    (&peer_addr.0, &peer_addr.1),
133                    (&canonical_addr.0, &canonical_addr.1),
134                    None,
135                ),
136                self.execute(
137                    method.clone(),
138                    &path,
139                    "application/x-www-form-urlencoded",
140                    opts.on_behalf_of_info.cloned(),
141                    None,
142                    Some(body),
143                ),
144                |_| EndDispatchFields::new(None, None),
145            )
146            .await?;
147
148        if resp.status() != 202 {
149            return Err(Self::decode_common_error(method, path, "create_bucket", resp).await);
150        }
151
152        Ok(())
153    }
154
155    pub async fn update_bucket(&self, opts: &UpdateBucketOptions<'_>) -> error::Result<()> {
156        let method = Method::POST;
157        let path = format!("pools/default/buckets/{}", opts.bucket_name).to_string();
158
159        let body = {
160            // Serializer is not Send so we need to drop it before making the request.
161            let mut form = url::form_urlencoded::Serializer::new(String::new());
162            encode_bucket_settings(&mut form, opts.bucket_settings);
163
164            Bytes::from(form.finish())
165        };
166
167        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
168        let canonical_addr =
169            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
170        let resp = self
171            .tracing
172            .orchestrate_dispatch_span(
173                BeginDispatchFields::new(
174                    (&peer_addr.0, &peer_addr.1),
175                    (&canonical_addr.0, &canonical_addr.1),
176                    None,
177                ),
178                self.execute(
179                    method.clone(),
180                    &path,
181                    "application/x-www-form-urlencoded",
182                    opts.on_behalf_of_info.cloned(),
183                    None,
184                    Some(body),
185                ),
186                |_| EndDispatchFields::new(None, None),
187            )
188            .await?;
189
190        if resp.status() != 200 {
191            return Err(Self::decode_common_error(method, path, "update_bucket", resp).await);
192        }
193
194        Ok(())
195    }
196
197    pub async fn delete_bucket(&self, opts: &DeleteBucketOptions<'_>) -> error::Result<()> {
198        let method = Method::DELETE;
199        let path = format!("pools/default/buckets/{}", opts.bucket_name).to_string();
200
201        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
202        let canonical_addr =
203            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
204        let resp = self
205            .tracing
206            .orchestrate_dispatch_span(
207                BeginDispatchFields::new(
208                    (&peer_addr.0, &peer_addr.1),
209                    (&canonical_addr.0, &canonical_addr.1),
210                    None,
211                ),
212                self.execute(
213                    method.clone(),
214                    &path,
215                    "",
216                    opts.on_behalf_of_info.cloned(),
217                    None,
218                    None,
219                ),
220                |_| EndDispatchFields::new(None, None),
221            )
222            .await?;
223
224        if resp.status() != 200 {
225            let e = Self::decode_common_error(method, path.clone(), "delete_bucket", resp).await;
226            return match e.kind() {
227                error::ErrorKind::Server(se) => {
228                    // A delayed operation is considered a success for deletion, since
229                    // bucket management is already eventually consistent anyways.
230                    if se.kind() == &error::ServerErrorKind::OperationDelayed {
231                        return Ok(());
232                    }
233
234                    Err(e)
235                }
236                _ => Err(e),
237            };
238        }
239
240        Ok(())
241    }
242
243    pub async fn flush_bucket(&self, opts: &FlushBucketOptions<'_>) -> error::Result<()> {
244        let method = Method::POST;
245        let path = format!(
246            "pools/default/buckets/{}/controller/doFlush",
247            opts.bucket_name
248        )
249        .to_string();
250
251        let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
252        let canonical_addr =
253            get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
254        let resp = self
255            .tracing
256            .orchestrate_dispatch_span(
257                BeginDispatchFields::new(
258                    (&peer_addr.0, &peer_addr.1),
259                    (&canonical_addr.0, &canonical_addr.1),
260                    None,
261                ),
262                self.execute(
263                    method.clone(),
264                    &path,
265                    "",
266                    opts.on_behalf_of_info.cloned(),
267                    None,
268                    None,
269                ),
270                |_| EndDispatchFields::new(None, None),
271            )
272            .await?;
273
274        if resp.status() != 200 {
275            return Err(Self::decode_common_error(method, path, "flush_bucket", resp).await);
276        }
277
278        Ok(())
279    }
280}