garage_api_admin/
bucket.rs

1use std::collections::HashMap;
2use std::sync::Arc;
3use std::time::Duration;
4
5use chrono::DateTime;
6
7use garage_util::crdt::*;
8use garage_util::data::*;
9use garage_util::time::*;
10
11use garage_table::*;
12
13use garage_model::bucket_alias_table::*;
14use garage_model::bucket_table::*;
15use garage_model::garage::Garage;
16use garage_model::permission::*;
17use garage_model::s3::mpu_table;
18use garage_model::s3::object_table::*;
19
20use garage_api_common::common_error::CommonError;
21
22use crate::api::*;
23use crate::error::*;
24use crate::{Admin, RequestHandler};
25
26impl RequestHandler for ListBucketsRequest {
27	type Response = ListBucketsResponse;
28
29	async fn handle(
30		self,
31		garage: &Arc<Garage>,
32		_admin: &Admin,
33	) -> Result<ListBucketsResponse, Error> {
34		let buckets = garage
35			.bucket_table
36			.get_range(
37				&EmptyKey,
38				None,
39				Some(DeletedFilter::NotDeleted),
40				10000,
41				EnumerationOrder::Forward,
42			)
43			.await?;
44
45		let res = buckets
46			.into_iter()
47			.map(|b| {
48				let state = b.state.as_option().unwrap();
49				ListBucketsResponseItem {
50					id: hex::encode(b.id),
51					created: DateTime::from_timestamp_millis(state.creation_date as i64)
52						.expect("invalid timestamp stored in db"),
53					global_aliases: state
54						.aliases
55						.items()
56						.iter()
57						.filter(|(_, _, a)| *a)
58						.map(|(n, _, _)| n.to_string())
59						.collect::<Vec<_>>(),
60					local_aliases: state
61						.local_aliases
62						.items()
63						.iter()
64						.filter(|(_, _, a)| *a)
65						.map(|((k, n), _, _)| BucketLocalAlias {
66							access_key_id: k.to_string(),
67							alias: n.to_string(),
68						})
69						.collect::<Vec<_>>(),
70				}
71			})
72			.collect::<Vec<_>>();
73
74		Ok(ListBucketsResponse(res))
75	}
76}
77
78impl RequestHandler for GetBucketInfoRequest {
79	type Response = GetBucketInfoResponse;
80
81	async fn handle(
82		self,
83		garage: &Arc<Garage>,
84		_admin: &Admin,
85	) -> Result<GetBucketInfoResponse, Error> {
86		let bucket_id = match (self.id, self.global_alias, self.search) {
87			(Some(id), None, None) => parse_bucket_id(&id)?,
88			(None, Some(ga), None) => garage
89				.bucket_alias_table
90				.get(&EmptyKey, &ga)
91				.await?
92				.and_then(|x| *x.state.get())
93				.ok_or_else(|| HelperError::NoSuchBucket(ga.to_string()))?,
94			(None, None, Some(search)) => {
95				let helper = garage.bucket_helper();
96				if let Some(bucket) = helper.resolve_global_bucket(&search).await? {
97					bucket.id
98				} else {
99					let hexdec = if search.len() >= 2 {
100						search
101							.get(..search.len() & !1)
102							.and_then(|x| hex::decode(x).ok())
103					} else {
104						None
105					};
106					let hex = hexdec
107						.ok_or_else(|| Error::Common(CommonError::NoSuchBucket(search.clone())))?;
108
109					let mut start = [0u8; 32];
110					start
111						.as_mut_slice()
112						.get_mut(..hex.len())
113						.ok_or_bad_request("invalid length")?
114						.copy_from_slice(&hex);
115					let mut candidates = garage
116						.bucket_table
117						.get_range(
118							&EmptyKey,
119							Some(start.into()),
120							Some(DeletedFilter::NotDeleted),
121							10,
122							EnumerationOrder::Forward,
123						)
124						.await?
125						.into_iter()
126						.collect::<Vec<_>>();
127					candidates.retain(|x| hex::encode(x.id).starts_with(&search));
128					if candidates.is_empty() {
129						return Err(Error::Common(CommonError::NoSuchBucket(search.clone())));
130					} else if candidates.len() == 1 {
131						candidates.into_iter().next().unwrap().id
132					} else {
133						return Err(Error::bad_request(format!(
134							"Several matching buckets: {}",
135							search
136						)));
137					}
138				}
139			}
140			_ => {
141				return Err(Error::bad_request(
142					"Either id, globalAlias or search must be provided (but not several of them)",
143				));
144			}
145		};
146
147		bucket_info_results(garage, bucket_id).await
148	}
149}
150
151impl RequestHandler for CreateBucketRequest {
152	type Response = CreateBucketResponse;
153
154	async fn handle(
155		self,
156		garage: &Arc<Garage>,
157		_admin: &Admin,
158	) -> Result<CreateBucketResponse, Error> {
159		let helper = garage.locked_helper().await;
160
161		if let Some(ga) = &self.global_alias {
162			if !is_valid_bucket_name(ga, garage.config.allow_punycode) {
163				return Err(Error::bad_request(format!(
164					"{}: {}",
165					ga, INVALID_BUCKET_NAME_MESSAGE
166				)));
167			}
168
169			if let Some(alias) = garage.bucket_alias_table.get(&EmptyKey, ga).await? {
170				if alias.state.get().is_some() {
171					return Err(CommonError::BucketAlreadyExists.into());
172				}
173			}
174		}
175
176		if let Some(la) = &self.local_alias {
177			if !is_valid_bucket_name(&la.alias, garage.config.allow_punycode) {
178				return Err(Error::bad_request(format!(
179					"{}: {}",
180					la.alias, INVALID_BUCKET_NAME_MESSAGE
181				)));
182			}
183
184			let key = helper.key().get_existing_key(&la.access_key_id).await?;
185			let state = key.state.as_option().unwrap();
186			if matches!(state.local_aliases.get(&la.alias), Some(_)) {
187				return Err(Error::bad_request("Local alias already exists"));
188			}
189		}
190
191		let bucket = Bucket::new();
192		garage.bucket_table.insert(&bucket).await?;
193
194		if let Some(ga) = &self.global_alias {
195			helper.set_global_bucket_alias(bucket.id, ga).await?;
196		}
197
198		if let Some(la) = &self.local_alias {
199			helper
200				.set_local_bucket_alias(bucket.id, &la.access_key_id, &la.alias)
201				.await?;
202
203			if la.allow.read || la.allow.write || la.allow.owner {
204				helper
205					.set_bucket_key_permissions(
206						bucket.id,
207						&la.access_key_id,
208						BucketKeyPerm {
209							timestamp: now_msec(),
210							allow_read: la.allow.read,
211							allow_write: la.allow.write,
212							allow_owner: la.allow.owner,
213						},
214					)
215					.await?;
216			}
217		}
218
219		Ok(CreateBucketResponse(
220			bucket_info_results(garage, bucket.id).await?,
221		))
222	}
223}
224
225impl RequestHandler for DeleteBucketRequest {
226	type Response = DeleteBucketResponse;
227
228	async fn handle(
229		self,
230		garage: &Arc<Garage>,
231		_admin: &Admin,
232	) -> Result<DeleteBucketResponse, Error> {
233		let helper = garage.locked_helper().await;
234
235		let bucket_id = parse_bucket_id(&self.id)?;
236
237		let mut bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
238		let state = bucket.state.as_option().unwrap();
239
240		// Check bucket is empty
241		if !helper.bucket().is_bucket_empty(bucket_id).await? {
242			return Err(CommonError::BucketNotEmpty.into());
243		}
244
245		// --- done checking, now commit ---
246		// 1. delete authorization from keys that had access
247		for (key_id, perm) in bucket.authorized_keys() {
248			if perm.is_any() {
249				helper
250					.set_bucket_key_permissions(bucket.id, key_id, BucketKeyPerm::NO_PERMISSIONS)
251					.await?;
252			}
253		}
254		// 2. delete all local aliases
255		for ((key_id, alias), _, active) in state.local_aliases.items().iter() {
256			if *active {
257				helper
258					.purge_local_bucket_alias(bucket.id, key_id, alias)
259					.await?;
260			}
261		}
262		// 3. delete all global aliases
263		for (alias, _, active) in state.aliases.items().iter() {
264			if *active {
265				helper.purge_global_bucket_alias(bucket.id, alias).await?;
266			}
267		}
268
269		// 4. delete bucket
270		bucket.state = Deletable::delete();
271		garage.bucket_table.insert(&bucket).await?;
272
273		Ok(DeleteBucketResponse)
274	}
275}
276
277impl RequestHandler for UpdateBucketRequest {
278	type Response = UpdateBucketResponse;
279
280	async fn handle(
281		self,
282		garage: &Arc<Garage>,
283		_admin: &Admin,
284	) -> Result<UpdateBucketResponse, Error> {
285		let bucket_id = parse_bucket_id(&self.id)?;
286
287		let mut bucket = garage
288			.bucket_helper()
289			.get_existing_bucket(bucket_id)
290			.await?;
291
292		let state = bucket.state.as_option_mut().unwrap();
293
294		if let Some(wa) = self.body.website_access {
295			if wa.enabled {
296				let (redirect_all, routing_rules) = match state.website_config.get() {
297					Some(wc) => (wc.redirect_all.clone(), wc.routing_rules.clone()),
298					None => (None, Vec::new()),
299				};
300				state.website_config.update(Some(WebsiteConfig {
301					index_document: wa.index_document.ok_or_bad_request(
302						"Please specify indexDocument when enabling website access.",
303					)?,
304					error_document: wa.error_document,
305					redirect_all,
306					routing_rules,
307				}));
308			} else {
309				if wa.index_document.is_some() || wa.error_document.is_some() {
310					return Err(Error::bad_request(
311                        "Cannot specify indexDocument or errorDocument when disabling website access.",
312                    ));
313				}
314				state.website_config.update(None);
315			}
316		}
317
318		if let Some(q) = self.body.quotas {
319			state.quotas.update(BucketQuotas {
320				max_size: q.max_size,
321				max_objects: q.max_objects,
322			});
323		}
324
325		garage.bucket_table.insert(&bucket).await?;
326
327		Ok(UpdateBucketResponse(
328			bucket_info_results(garage, bucket.id).await?,
329		))
330	}
331}
332
333impl RequestHandler for CleanupIncompleteUploadsRequest {
334	type Response = CleanupIncompleteUploadsResponse;
335
336	async fn handle(
337		self,
338		garage: &Arc<Garage>,
339		_admin: &Admin,
340	) -> Result<CleanupIncompleteUploadsResponse, Error> {
341		let duration = Duration::from_secs(self.older_than_secs);
342
343		let bucket_id = parse_bucket_id(&self.bucket_id)?;
344
345		let count = garage
346			.bucket_helper()
347			.cleanup_incomplete_uploads(&bucket_id, duration)
348			.await?;
349
350		Ok(CleanupIncompleteUploadsResponse {
351			uploads_deleted: count as u64,
352		})
353	}
354}
355
356impl RequestHandler for InspectObjectRequest {
357	type Response = InspectObjectResponse;
358
359	async fn handle(
360		self,
361		garage: &Arc<Garage>,
362		_admin: &Admin,
363	) -> Result<InspectObjectResponse, Error> {
364		let bucket_id = parse_bucket_id(&self.bucket_id)?;
365
366		let object = garage
367			.object_table
368			.get(&bucket_id, &self.key)
369			.await?
370			.ok_or_else(|| Error::NoSuchKey)?;
371
372		let mut versions = vec![];
373		for obj_ver in object.versions().iter() {
374			let ver = garage.version_table.get(&obj_ver.uuid, &EmptyKey).await?;
375			let blocks = ver
376				.map(|v| {
377					v.blocks
378						.items()
379						.iter()
380						.map(|(vk, vb)| InspectObjectBlock {
381							part_number: vk.part_number,
382							offset: vk.offset,
383							hash: hex::encode(&vb.hash),
384							size: vb.size,
385						})
386						.collect::<Vec<_>>()
387				})
388				.unwrap_or_default();
389			let uuid = hex::encode(&obj_ver.uuid);
390			let timestamp = DateTime::from_timestamp_millis(obj_ver.timestamp as i64)
391				.expect("invalid timestamp in db");
392			match &obj_ver.state {
393				ObjectVersionState::Uploading { encryption, .. } => {
394					versions.push(InspectObjectVersion {
395						uuid,
396						timestamp,
397						encrypted: !matches!(encryption, ObjectVersionEncryption::Plaintext { .. }),
398						uploading: true,
399						headers: match encryption {
400							ObjectVersionEncryption::Plaintext { inner } => inner.headers.clone(),
401							_ => vec![],
402						},
403						blocks,
404						..Default::default()
405					});
406				}
407				ObjectVersionState::Complete(data) => match data {
408					ObjectVersionData::DeleteMarker => {
409						versions.push(InspectObjectVersion {
410							uuid,
411							timestamp,
412							delete_marker: true,
413							..Default::default()
414						});
415					}
416					ObjectVersionData::Inline(meta, _) => {
417						versions.push(InspectObjectVersion {
418							uuid,
419							timestamp,
420							inline: true,
421							size: Some(meta.size),
422							etag: Some(meta.etag.clone()),
423							encrypted: !matches!(
424								meta.encryption,
425								ObjectVersionEncryption::Plaintext { .. }
426							),
427							headers: match &meta.encryption {
428								ObjectVersionEncryption::Plaintext { inner } => {
429									inner.headers.clone()
430								}
431								_ => vec![],
432							},
433							..Default::default()
434						});
435					}
436					ObjectVersionData::FirstBlock(meta, _) => {
437						versions.push(InspectObjectVersion {
438							uuid,
439							timestamp,
440							size: Some(meta.size),
441							etag: Some(meta.etag.clone()),
442							encrypted: !matches!(
443								meta.encryption,
444								ObjectVersionEncryption::Plaintext { .. }
445							),
446							headers: match &meta.encryption {
447								ObjectVersionEncryption::Plaintext { inner } => {
448									inner.headers.clone()
449								}
450								_ => vec![],
451							},
452							blocks,
453							..Default::default()
454						});
455					}
456				},
457				ObjectVersionState::Aborted => {
458					versions.push(InspectObjectVersion {
459						uuid,
460						timestamp,
461						aborted: true,
462						blocks,
463						..Default::default()
464					});
465				}
466			}
467		}
468
469		Ok(InspectObjectResponse {
470			bucket_id: hex::encode(&object.bucket_id),
471			key: object.key,
472			versions,
473		})
474	}
475}
476
477// ---- BUCKET/KEY PERMISSIONS ----
478
479impl RequestHandler for AllowBucketKeyRequest {
480	type Response = AllowBucketKeyResponse;
481
482	async fn handle(
483		self,
484		garage: &Arc<Garage>,
485		_admin: &Admin,
486	) -> Result<AllowBucketKeyResponse, Error> {
487		let res = handle_bucket_change_key_perm(garage, self.0, true).await?;
488		Ok(AllowBucketKeyResponse(res))
489	}
490}
491
492impl RequestHandler for DenyBucketKeyRequest {
493	type Response = DenyBucketKeyResponse;
494
495	async fn handle(
496		self,
497		garage: &Arc<Garage>,
498		_admin: &Admin,
499	) -> Result<DenyBucketKeyResponse, Error> {
500		let res = handle_bucket_change_key_perm(garage, self.0, false).await?;
501		Ok(DenyBucketKeyResponse(res))
502	}
503}
504
505pub async fn handle_bucket_change_key_perm(
506	garage: &Arc<Garage>,
507	req: BucketKeyPermChangeRequest,
508	new_perm_flag: bool,
509) -> Result<GetBucketInfoResponse, Error> {
510	let helper = garage.locked_helper().await;
511
512	let bucket_id = parse_bucket_id(&req.bucket_id)?;
513
514	let bucket = helper.bucket().get_existing_bucket(bucket_id).await?;
515	let state = bucket.state.as_option().unwrap();
516
517	let key = helper.key().get_existing_key(&req.access_key_id).await?;
518
519	let mut perm = state
520		.authorized_keys
521		.get(&key.key_id)
522		.cloned()
523		.unwrap_or(BucketKeyPerm::NO_PERMISSIONS);
524
525	if req.permissions.read {
526		perm.allow_read = new_perm_flag;
527	}
528	if req.permissions.write {
529		perm.allow_write = new_perm_flag;
530	}
531	if req.permissions.owner {
532		perm.allow_owner = new_perm_flag;
533	}
534
535	helper
536		.set_bucket_key_permissions(bucket.id, &key.key_id, perm)
537		.await?;
538
539	bucket_info_results(garage, bucket.id).await
540}
541
542// ---- BUCKET ALIASES ----
543
544impl RequestHandler for AddBucketAliasRequest {
545	type Response = AddBucketAliasResponse;
546
547	async fn handle(
548		self,
549		garage: &Arc<Garage>,
550		_admin: &Admin,
551	) -> Result<AddBucketAliasResponse, Error> {
552		let bucket_id = parse_bucket_id(&self.bucket_id)?;
553
554		let helper = garage.locked_helper().await;
555
556		match self.alias {
557			BucketAliasEnum::Global { global_alias } => {
558				helper
559					.set_global_bucket_alias(bucket_id, &global_alias)
560					.await?
561			}
562			BucketAliasEnum::Local {
563				local_alias,
564				access_key_id,
565			} => {
566				helper
567					.set_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
568					.await?
569			}
570		}
571
572		Ok(AddBucketAliasResponse(
573			bucket_info_results(garage, bucket_id).await?,
574		))
575	}
576}
577
578impl RequestHandler for RemoveBucketAliasRequest {
579	type Response = RemoveBucketAliasResponse;
580
581	async fn handle(
582		self,
583		garage: &Arc<Garage>,
584		_admin: &Admin,
585	) -> Result<RemoveBucketAliasResponse, Error> {
586		let bucket_id = parse_bucket_id(&self.bucket_id)?;
587
588		let helper = garage.locked_helper().await;
589
590		match self.alias {
591			BucketAliasEnum::Global { global_alias } => {
592				helper
593					.unset_global_bucket_alias(bucket_id, &global_alias)
594					.await?
595			}
596			BucketAliasEnum::Local {
597				local_alias,
598				access_key_id,
599			} => {
600				helper
601					.unset_local_bucket_alias(bucket_id, &access_key_id, &local_alias)
602					.await?
603			}
604		}
605
606		Ok(RemoveBucketAliasResponse(
607			bucket_info_results(garage, bucket_id).await?,
608		))
609	}
610}
611
612// ---- HELPER ----
613
614async fn bucket_info_results(
615	garage: &Arc<Garage>,
616	bucket_id: Uuid,
617) -> Result<GetBucketInfoResponse, Error> {
618	let bucket = garage
619		.bucket_helper()
620		.get_existing_bucket(bucket_id)
621		.await?;
622
623	let counters = garage
624		.object_counter_table
625		.table
626		.get(&bucket.id, &EmptyKey)
627		.await?
628		.map(|x| x.filtered_values(&garage.system.cluster_layout()))
629		.unwrap_or_default();
630
631	let mpu_counters = garage
632		.mpu_counter_table
633		.table
634		.get(&bucket.id, &EmptyKey)
635		.await?
636		.map(|x| x.filtered_values(&garage.system.cluster_layout()))
637		.unwrap_or_default();
638
639	let mut relevant_keys = HashMap::new();
640	for (k, _) in bucket
641		.state
642		.as_option()
643		.unwrap()
644		.authorized_keys
645		.items()
646		.iter()
647	{
648		if let Some(key) = garage
649			.key_table
650			.get(&EmptyKey, k)
651			.await?
652			.filter(|k| !k.is_deleted())
653		{
654			if !key.state.is_deleted() {
655				relevant_keys.insert(k.clone(), key);
656			}
657		}
658	}
659	for ((k, _), _, _) in bucket
660		.state
661		.as_option()
662		.unwrap()
663		.local_aliases
664		.items()
665		.iter()
666	{
667		if relevant_keys.contains_key(k) {
668			continue;
669		}
670		if let Some(key) = garage.key_table.get(&EmptyKey, k).await? {
671			if !key.state.is_deleted() {
672				relevant_keys.insert(k.clone(), key);
673			}
674		}
675	}
676
677	let state = bucket.state.as_option().unwrap();
678
679	let quotas = state.quotas.get();
680	let res = GetBucketInfoResponse {
681		id: hex::encode(bucket.id),
682		created: DateTime::from_timestamp_millis(state.creation_date as i64)
683			.expect("invalid timestamp stored in db"),
684		global_aliases: state
685			.aliases
686			.items()
687			.iter()
688			.filter(|(_, _, a)| *a)
689			.map(|(n, _, _)| n.to_string())
690			.collect::<Vec<_>>(),
691		website_access: state.website_config.get().is_some(),
692		website_config: state.website_config.get().clone().map(|wsc| {
693			GetBucketInfoWebsiteResponse {
694				index_document: wsc.index_document,
695				error_document: wsc.error_document,
696			}
697		}),
698		keys: relevant_keys
699			.into_values()
700			.filter_map(|key| {
701				let p = key.state.as_option().unwrap();
702				let permissions = p
703					.authorized_buckets
704					.get(&bucket.id)
705					.filter(|p| p.is_any())
706					.map(|p| ApiBucketKeyPerm {
707						read: p.allow_read,
708						write: p.allow_write,
709						owner: p.allow_owner,
710					})?;
711				Some(GetBucketInfoKey {
712					access_key_id: key.key_id,
713					name: p.name.get().to_string(),
714					permissions,
715					bucket_local_aliases: p
716						.local_aliases
717						.items()
718						.iter()
719						.filter(|(_, _, b)| *b == Some(bucket.id))
720						.map(|(n, _, _)| n.to_string())
721						.collect::<Vec<_>>(),
722				})
723			})
724			.collect::<Vec<_>>(),
725		objects: *counters.get(OBJECTS).unwrap_or(&0),
726		bytes: *counters.get(BYTES).unwrap_or(&0),
727		unfinished_uploads: *counters.get(UNFINISHED_UPLOADS).unwrap_or(&0),
728		unfinished_multipart_uploads: *mpu_counters.get(mpu_table::UPLOADS).unwrap_or(&0),
729		unfinished_multipart_upload_parts: *mpu_counters.get(mpu_table::PARTS).unwrap_or(&0),
730		unfinished_multipart_upload_bytes: *mpu_counters.get(mpu_table::BYTES).unwrap_or(&0),
731		quotas: ApiBucketQuotas {
732			max_size: quotas.max_size,
733			max_objects: quotas.max_objects,
734		},
735	};
736
737	Ok(res)
738}
739
740fn parse_bucket_id(id: &str) -> Result<Uuid, Error> {
741	let id_hex = hex::decode(id).ok_or_bad_request("Invalid bucket id")?;
742	Ok(Uuid::try_from(&id_hex).ok_or_bad_request("Invalid bucket id")?)
743}