bbox_tile_server/store/
s3.rs1use crate::config::{S3StoreCfg, StoreCompressionCfg};
2use crate::store::{CacheLayout, TileReader, TileStoreError, TileWriter};
3use async_trait::async_trait;
4use bbox_core::{Compression, Format, TileResponse};
5use log::debug;
6use rusoto_s3::{PutObjectError, PutObjectRequest, S3Client, S3};
7use std::env;
8use std::fs::{self, File};
9use std::io::{BufReader, Read};
10use std::path::Path;
11use std::path::PathBuf;
12use tile_grid::Xyz;
13
14#[derive(Clone, Debug)]
15pub struct S3Store {
16 bucket: String,
17 region: rusoto_core::Region,
18 compression: StoreCompressionCfg,
19 format: Format,
20}
21
22#[derive(thiserror::Error, Debug)]
23pub enum S3StoreError {
24 #[error("S3 path should be 's3://bucket'")]
25 InvalidS3Path,
26 #[error("Reading input failed: {0}")]
27 ReadInputError(#[source] std::io::Error),
28 #[error("Upload failed: {0}")]
29 UploadFailed(#[source] rusoto_core::RusotoError<PutObjectError>),
30}
31
32impl S3Store {
33 pub fn from_s3_path(
34 s3_path: &str,
35 compression: &Option<StoreCompressionCfg>,
36 format: Format,
37 ) -> Result<Self, S3StoreError> {
38 let bucket = match s3_path.strip_prefix("s3://") {
39 None => return Err(S3StoreError::InvalidS3Path),
40 Some(bucket) => {
41 if bucket.contains('/') {
42 return Err(S3StoreError::InvalidS3Path);
43 } else {
44 bucket.to_string()
45 }
46 }
47 };
48 let region = match env::var("S3_ENDPOINT_URL") {
49 Ok(endpoint) => rusoto_core::Region::Custom {
50 name: "region".to_string(),
51 endpoint,
52 },
53 Err(_) => rusoto_core::Region::default(),
54 };
55 let compression = compression.clone().unwrap_or(StoreCompressionCfg::None);
56
57 Ok(S3Store {
58 bucket,
59 region,
60 compression,
61 format,
62 })
63 }
64 pub fn from_config(
65 cfg: &S3StoreCfg,
66 compression: &Option<StoreCompressionCfg>,
67 format: &Format,
68 ) -> Result<Self, TileStoreError> {
69 Self::from_s3_path(&cfg.path, compression, *format).map_err(Into::into)
70 }
71}
72
73#[async_trait]
74impl TileWriter for S3Store {
75 fn compression(&self) -> Compression {
76 match self.compression {
77 StoreCompressionCfg::Gzip => Compression::Gzip,
78 StoreCompressionCfg::None => Compression::None,
79 }
80 }
81 async fn exists(&self, _xyz: &Xyz) -> bool {
82 false
84 }
85 async fn put_tile(&self, xyz: &Xyz, data: Vec<u8>) -> Result<(), TileStoreError> {
86 let key = CacheLayout::Zxy.path_string(&PathBuf::new(), xyz, &self.format);
87 self.put_data(key, data).await
88 }
89}
90
91impl S3Store {
92 pub async fn put_data(&self, key: String, data: Vec<u8>) -> Result<(), TileStoreError> {
93 let bucket = self.bucket.clone();
94 let client = S3Client::new(self.region.clone());
96 let content_length = data.len() as i64;
97 debug!("cp {key} ({content_length} bytes)");
98
99 if let Err(e) = {
100 let request = PutObjectRequest {
101 bucket,
102 key,
103 body: Some(data.into()),
104 content_length: Some(content_length),
105 ..Default::default()
106 };
107 client.put_object(request).await
108 } {
109 eprintln!("Upload failed: {e}");
110 return Err(S3StoreError::UploadFailed(e).into());
111 }
112 Ok(())
113 }
114 #[allow(dead_code)]
116 pub async fn copy_tile(&self, base_dir: &Path, xyz: &Xyz) -> Result<(), TileStoreError> {
117 let fullpath = CacheLayout::Zxy.path(base_dir, xyz, &self.format);
118 let p = fullpath.as_path();
119 let mut data =
120 BufReader::new(File::open(p).map_err(|e| TileStoreError::FileError(p.into(), e))?);
121 let mut bytes = Vec::new();
122 data.read_to_end(&mut bytes)?;
123 self.put_tile(xyz, bytes).await?;
124 fs::remove_file(p).map_err(|e| TileStoreError::FileError(p.into(), e))?;
125
126 Ok(())
127 }
128}
129
130#[async_trait]
131impl TileReader for S3Store {
132 async fn get_tile(&self, _xyz: &Xyz) -> Result<Option<TileResponse>, TileStoreError> {
133 Ok(None)
135 }
136}