1use crate::httpx::client::Client;
20use crate::mgmtx::bucket_settings::{encode_bucket_settings, BucketDef};
21use crate::mgmtx::bucket_settings_json::BucketSettingsJson;
22use crate::mgmtx::error;
23use crate::mgmtx::mgmt::{parse_response_json, Management};
24use crate::mgmtx::options::{
25 CreateBucketOptions, DeleteBucketOptions, FlushBucketOptions, GetAllBucketsOptions,
26 GetBucketOptions, UpdateBucketOptions,
27};
28use crate::tracingcomponent::{BeginDispatchFields, EndDispatchFields};
29use crate::util::get_host_port_tuple_from_uri;
30use bytes::Bytes;
31use http::Method;
32
33impl<C: Client> Management<C> {
34 pub async fn get_all_buckets(
35 &self,
36 opts: &GetAllBucketsOptions<'_>,
37 ) -> error::Result<Vec<BucketDef>> {
38 let method = Method::GET;
39 let path = "pools/default/buckets".to_string();
40
41 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
42 let canonical_addr =
43 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
44 let resp = self
45 .tracing
46 .orchestrate_dispatch_span(
47 BeginDispatchFields::new(
48 (&peer_addr.0, &peer_addr.1),
49 (&canonical_addr.0, &canonical_addr.1),
50 None,
51 ),
52 self.execute(
53 method.clone(),
54 &path,
55 "",
56 opts.on_behalf_of_info.cloned(),
57 None,
58 None,
59 ),
60 |_| EndDispatchFields::new(None, None),
61 )
62 .await?;
63 if resp.status() != 200 {
64 return Err(Self::decode_common_error(method, path, "get_all_buckets", resp).await);
65 }
66
67 let json_buckets: Vec<BucketSettingsJson> = parse_response_json(resp).await?;
68 let mut buckets = Vec::with_capacity(json_buckets.len());
69 for bucket in json_buckets {
70 buckets.push(bucket.into());
71 }
72
73 Ok(buckets)
74 }
75
76 pub async fn get_bucket(&self, opts: &GetBucketOptions<'_>) -> error::Result<BucketDef> {
77 let method = Method::GET;
78 let path = format!("pools/default/buckets/{}", opts.bucket_name).to_string();
79
80 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
81 let canonical_addr =
82 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
83 let resp = self
84 .tracing
85 .orchestrate_dispatch_span(
86 BeginDispatchFields::new(
87 (&peer_addr.0, &peer_addr.1),
88 (&canonical_addr.0, &canonical_addr.1),
89 None,
90 ),
91 self.execute(
92 method.clone(),
93 &path,
94 "",
95 opts.on_behalf_of_info.cloned(),
96 None,
97 None,
98 ),
99 |_| EndDispatchFields::new(None, None),
100 )
101 .await?;
102
103 if resp.status() != 200 {
104 return Err(Self::decode_common_error(method, path, "get_bucket", resp).await);
105 }
106
107 let bucket: BucketSettingsJson = parse_response_json(resp).await?;
108
109 Ok(bucket.into())
110 }
111
112 pub async fn create_bucket(&self, opts: &CreateBucketOptions<'_>) -> error::Result<()> {
113 let method = Method::POST;
114 let path = "pools/default/buckets".to_string();
115
116 let body = {
117 let mut form = url::form_urlencoded::Serializer::new(String::new());
119 form.append_pair("name", opts.bucket_name);
120 encode_bucket_settings(&mut form, opts.bucket_settings);
121
122 Bytes::from(form.finish())
123 };
124
125 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
126 let canonical_addr =
127 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
128 let resp = self
129 .tracing
130 .orchestrate_dispatch_span(
131 BeginDispatchFields::new(
132 (&peer_addr.0, &peer_addr.1),
133 (&canonical_addr.0, &canonical_addr.1),
134 None,
135 ),
136 self.execute(
137 method.clone(),
138 &path,
139 "application/x-www-form-urlencoded",
140 opts.on_behalf_of_info.cloned(),
141 None,
142 Some(body),
143 ),
144 |_| EndDispatchFields::new(None, None),
145 )
146 .await?;
147
148 if resp.status() != 202 {
149 return Err(Self::decode_common_error(method, path, "create_bucket", resp).await);
150 }
151
152 Ok(())
153 }
154
155 pub async fn update_bucket(&self, opts: &UpdateBucketOptions<'_>) -> error::Result<()> {
156 let method = Method::POST;
157 let path = format!("pools/default/buckets/{}", opts.bucket_name).to_string();
158
159 let body = {
160 let mut form = url::form_urlencoded::Serializer::new(String::new());
162 encode_bucket_settings(&mut form, opts.bucket_settings);
163
164 Bytes::from(form.finish())
165 };
166
167 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
168 let canonical_addr =
169 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
170 let resp = self
171 .tracing
172 .orchestrate_dispatch_span(
173 BeginDispatchFields::new(
174 (&peer_addr.0, &peer_addr.1),
175 (&canonical_addr.0, &canonical_addr.1),
176 None,
177 ),
178 self.execute(
179 method.clone(),
180 &path,
181 "application/x-www-form-urlencoded",
182 opts.on_behalf_of_info.cloned(),
183 None,
184 Some(body),
185 ),
186 |_| EndDispatchFields::new(None, None),
187 )
188 .await?;
189
190 if resp.status() != 200 {
191 return Err(Self::decode_common_error(method, path, "update_bucket", resp).await);
192 }
193
194 Ok(())
195 }
196
197 pub async fn delete_bucket(&self, opts: &DeleteBucketOptions<'_>) -> error::Result<()> {
198 let method = Method::DELETE;
199 let path = format!("pools/default/buckets/{}", opts.bucket_name).to_string();
200
201 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
202 let canonical_addr =
203 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
204 let resp = self
205 .tracing
206 .orchestrate_dispatch_span(
207 BeginDispatchFields::new(
208 (&peer_addr.0, &peer_addr.1),
209 (&canonical_addr.0, &canonical_addr.1),
210 None,
211 ),
212 self.execute(
213 method.clone(),
214 &path,
215 "",
216 opts.on_behalf_of_info.cloned(),
217 None,
218 None,
219 ),
220 |_| EndDispatchFields::new(None, None),
221 )
222 .await?;
223
224 if resp.status() != 200 {
225 let e = Self::decode_common_error(method, path.clone(), "delete_bucket", resp).await;
226 return match e.kind() {
227 error::ErrorKind::Server(se) => {
228 if se.kind() == &error::ServerErrorKind::OperationDelayed {
231 return Ok(());
232 }
233
234 Err(e)
235 }
236 _ => Err(e),
237 };
238 }
239
240 Ok(())
241 }
242
243 pub async fn flush_bucket(&self, opts: &FlushBucketOptions<'_>) -> error::Result<()> {
244 let method = Method::POST;
245 let path = format!(
246 "pools/default/buckets/{}/controller/doFlush",
247 opts.bucket_name
248 )
249 .to_string();
250
251 let peer_addr = get_host_port_tuple_from_uri(&self.endpoint).unwrap_or_default();
252 let canonical_addr =
253 get_host_port_tuple_from_uri(&self.canonical_endpoint).unwrap_or_default();
254 let resp = self
255 .tracing
256 .orchestrate_dispatch_span(
257 BeginDispatchFields::new(
258 (&peer_addr.0, &peer_addr.1),
259 (&canonical_addr.0, &canonical_addr.1),
260 None,
261 ),
262 self.execute(
263 method.clone(),
264 &path,
265 "",
266 opts.on_behalf_of_info.cloned(),
267 None,
268 None,
269 ),
270 |_| EndDispatchFields::new(None, None),
271 )
272 .await?;
273
274 if resp.status() != 200 {
275 return Err(Self::decode_common_error(method, path, "flush_bucket", resp).await);
276 }
277
278 Ok(())
279 }
280}