bbox_tile_server/store/
s3.rs

1use crate::config::{S3StoreCfg, StoreCompressionCfg};
2use crate::store::{CacheLayout, TileReader, TileStoreError, TileWriter};
3use async_trait::async_trait;
4use bbox_core::{Compression, Format, TileResponse};
5use log::debug;
6use rusoto_s3::{PutObjectError, PutObjectRequest, S3Client, S3};
7use std::env;
8use std::fs::{self, File};
9use std::io::{BufReader, Read};
10use std::path::Path;
11use std::path::PathBuf;
12use tile_grid::Xyz;
13
14#[derive(Clone, Debug)]
15pub struct S3Store {
16    bucket: String,
17    region: rusoto_core::Region,
18    compression: StoreCompressionCfg,
19    format: Format,
20}
21
22#[derive(thiserror::Error, Debug)]
23pub enum S3StoreError {
24    #[error("S3 path should be 's3://bucket'")]
25    InvalidS3Path,
26    #[error("Reading input failed: {0}")]
27    ReadInputError(#[source] std::io::Error),
28    #[error("Upload failed: {0}")]
29    UploadFailed(#[source] rusoto_core::RusotoError<PutObjectError>),
30}
31
32impl S3Store {
33    pub fn from_s3_path(
34        s3_path: &str,
35        compression: &Option<StoreCompressionCfg>,
36        format: Format,
37    ) -> Result<Self, S3StoreError> {
38        let bucket = match s3_path.strip_prefix("s3://") {
39            None => return Err(S3StoreError::InvalidS3Path),
40            Some(bucket) => {
41                if bucket.contains('/') {
42                    return Err(S3StoreError::InvalidS3Path);
43                } else {
44                    bucket.to_string()
45                }
46            }
47        };
48        let region = match env::var("S3_ENDPOINT_URL") {
49            Ok(endpoint) => rusoto_core::Region::Custom {
50                name: "region".to_string(),
51                endpoint,
52            },
53            Err(_) => rusoto_core::Region::default(),
54        };
55        let compression = compression.clone().unwrap_or(StoreCompressionCfg::None);
56
57        Ok(S3Store {
58            bucket,
59            region,
60            compression,
61            format,
62        })
63    }
64    pub fn from_config(
65        cfg: &S3StoreCfg,
66        compression: &Option<StoreCompressionCfg>,
67        format: &Format,
68    ) -> Result<Self, TileStoreError> {
69        Self::from_s3_path(&cfg.path, compression, *format).map_err(Into::into)
70    }
71}
72
73#[async_trait]
74impl TileWriter for S3Store {
75    fn compression(&self) -> Compression {
76        match self.compression {
77            StoreCompressionCfg::Gzip => Compression::Gzip,
78            StoreCompressionCfg::None => Compression::None,
79        }
80    }
81    async fn exists(&self, _xyz: &Xyz) -> bool {
82        // 2nd level cache lookup is not supported
83        false
84    }
85    async fn put_tile(&self, xyz: &Xyz, data: Vec<u8>) -> Result<(), TileStoreError> {
86        let key = CacheLayout::Zxy.path_string(&PathBuf::new(), xyz, &self.format);
87        self.put_data(key, data).await
88    }
89}
90
91impl S3Store {
92    pub async fn put_data(&self, key: String, data: Vec<u8>) -> Result<(), TileStoreError> {
93        let bucket = self.bucket.clone();
94        // TODO: Workaround for https://github.com/rusoto/rusoto/issues/1980
95        let client = S3Client::new(self.region.clone());
96        let content_length = data.len() as i64;
97        debug!("cp {key} ({content_length} bytes)");
98
99        if let Err(e) = {
100            let request = PutObjectRequest {
101                bucket,
102                key,
103                body: Some(data.into()),
104                content_length: Some(content_length),
105                ..Default::default()
106            };
107            client.put_object(request).await
108        } {
109            eprintln!("Upload failed: {e}");
110            return Err(S3StoreError::UploadFailed(e).into());
111        }
112        Ok(())
113    }
114    /// Put tile from temporary file
115    #[allow(dead_code)]
116    pub async fn copy_tile(&self, base_dir: &Path, xyz: &Xyz) -> Result<(), TileStoreError> {
117        let fullpath = CacheLayout::Zxy.path(base_dir, xyz, &self.format);
118        let p = fullpath.as_path();
119        let mut data =
120            BufReader::new(File::open(p).map_err(|e| TileStoreError::FileError(p.into(), e))?);
121        let mut bytes = Vec::new();
122        data.read_to_end(&mut bytes)?;
123        self.put_tile(xyz, bytes).await?;
124        fs::remove_file(p).map_err(|e| TileStoreError::FileError(p.into(), e))?;
125
126        Ok(())
127    }
128}
129
130#[async_trait]
131impl TileReader for S3Store {
132    async fn get_tile(&self, _xyz: &Xyz) -> Result<Option<TileResponse>, TileStoreError> {
133        // 2nd level cache lookup is not supported
134        Ok(None)
135    }
136}