use std::sync::Arc;
use crate::pixel::PixelFormat;
use crate::planner::{PyramidPlan, TileCoord};
use crate::raster::Raster;
use crate::sink::{BLANK_TILE_MARKER, SinkError, Tile, TileFormat, TileSink, encode_png};
pub trait ObjectStore: Send + Sync {
fn put(&self, key: &str, bytes: &[u8]) -> Result<(), SinkError>;
}
#[derive(Clone)]
#[non_exhaustive]
pub struct ObjectStoreConfig {
pub endpoint: String,
pub bucket: String,
pub access_key: Option<String>,
pub secret_key: Option<String>,
pub key_prefix: String,
pub image_name: String,
pub multipart_threshold: usize,
store: Option<Arc<dyn ObjectStore>>,
}
impl std::fmt::Debug for ObjectStoreConfig {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStoreConfig")
.field("endpoint", &self.endpoint)
.field("bucket", &self.bucket)
.field(
"access_key",
&self.access_key.as_ref().map(|_| "<redacted>"),
)
.field(
"secret_key",
&self.secret_key.as_ref().map(|_| "<redacted>"),
)
.field("key_prefix", &self.key_prefix)
.field("image_name", &self.image_name)
.field("multipart_threshold", &self.multipart_threshold)
.field("store", &self.store.as_ref().map(|_| "<dyn ObjectStore>"))
.finish()
}
}
impl ObjectStoreConfig {
pub fn s3(endpoint: impl Into<String>, bucket: impl Into<String>) -> Self {
Self {
endpoint: endpoint.into(),
bucket: bucket.into(),
access_key: None,
secret_key: None,
key_prefix: String::new(),
image_name: "image".to_string(),
multipart_threshold: 8 * 1024 * 1024,
store: None,
}
}
pub fn with_access_key(
mut self,
access_key: impl Into<String>,
secret_key: impl Into<String>,
) -> Self {
self.access_key = Some(access_key.into());
self.secret_key = Some(secret_key.into());
self
}
pub fn with_key_prefix(mut self, prefix: impl Into<String>) -> Self {
self.key_prefix = prefix.into();
self
}
pub fn with_image_name(mut self, image_name: impl Into<String>) -> Self {
self.image_name = image_name.into();
self
}
pub fn with_multipart_threshold(mut self, threshold: usize) -> Self {
self.multipart_threshold = threshold;
self
}
pub fn with_object_store(mut self, store: Arc<dyn ObjectStore>) -> Self {
self.store = Some(store);
self
}
}
pub fn deep_zoom_key(
prefix: &str,
image_name: &str,
level: u32,
x: u32,
y: u32,
ext: &str,
) -> String {
let trimmed = prefix.trim_matches('/');
if trimmed.is_empty() {
format!("{image_name}_files/{level}/{x}_{y}.{ext}")
} else {
format!("{trimmed}/{image_name}_files/{level}/{x}_{y}.{ext}")
}
}
fn xyz_key(prefix: &str, image_name: &str, z: u32, x: u32, y: u32, ext: &str) -> String {
let trimmed = prefix.trim_matches('/');
if trimmed.is_empty() {
format!("{image_name}/{z}/{x}/{y}.{ext}")
} else {
format!("{trimmed}/{image_name}/{z}/{x}/{y}.{ext}")
}
}
fn google_key(prefix: &str, image_name: &str, z: u32, x: u32, y: u32, ext: &str) -> String {
let trimmed = prefix.trim_matches('/');
if trimmed.is_empty() {
format!("{image_name}/{z}/{y}/{x}.{ext}")
} else {
format!("{trimmed}/{image_name}/{z}/{y}/{x}.{ext}")
}
}
fn color_type_for_format(fmt: PixelFormat) -> Result<image::ColorType, SinkError> {
match fmt {
PixelFormat::Gray8 => Ok(image::ColorType::L8),
PixelFormat::Gray16 => Ok(image::ColorType::L16),
PixelFormat::Rgb8 => Ok(image::ColorType::Rgb8),
PixelFormat::Rgba8 => Ok(image::ColorType::Rgba8),
PixelFormat::Rgb16 => Ok(image::ColorType::Rgb16),
PixelFormat::Rgba16 => Ok(image::ColorType::Rgba16),
}
}
fn encode_jpeg_local(raster: &Raster, quality: u8) -> Result<Vec<u8>, SinkError> {
let mut buf = Vec::new();
let encoder =
image::codecs::jpeg::JpegEncoder::new_with_quality(std::io::Cursor::new(&mut buf), quality);
let ct = color_type_for_format(raster.format())?;
image::ImageEncoder::write_image(
encoder,
raster.data(),
raster.width(),
raster.height(),
ct.into(),
)
.map_err(|e| SinkError::Encode {
format: "jpeg".into(),
source: e,
})?;
Ok(buf)
}
fn encode_tile(raster: &Raster, format: TileFormat) -> Result<Vec<u8>, SinkError> {
match format {
TileFormat::Raw => Ok(raster.data().to_vec()),
TileFormat::Png => encode_png(raster),
TileFormat::Jpeg { quality } => encode_jpeg_local(raster, quality),
}
}
#[non_exhaustive]
pub struct ObjectStoreSink {
cfg: ObjectStoreConfig,
plan: PyramidPlan,
format: TileFormat,
}
impl std::fmt::Debug for ObjectStoreSink {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ObjectStoreSink")
.field("cfg", &self.cfg)
.field("format", &self.format)
.finish()
}
}
impl ObjectStoreSink {
pub fn new(
cfg: ObjectStoreConfig,
plan: PyramidPlan,
format: TileFormat,
) -> Result<Self, SinkError> {
if cfg.store.is_none() {
return Err(SinkError::Other(
"ObjectStoreSink: no backend attached. Call \
ObjectStoreConfig::with_object_store(Arc<dyn ObjectStore>) \
to inject an ObjectStore implementation (the built-in S3 \
HTTP client is not compiled into this build)."
.into(),
));
}
Ok(Self { cfg, plan, format })
}
pub fn list_objects(&self) -> Result<Vec<String>, SinkError> {
Ok(Vec::new())
}
fn key_for(&self, coord: TileCoord) -> Option<String> {
let level = self.plan.levels.get(coord.level as usize)?;
if coord.col >= level.cols || coord.row >= level.rows {
return None;
}
let ext = self.format.extension();
let key = match self.plan.layout {
crate::planner::Layout::DeepZoom => deep_zoom_key(
&self.cfg.key_prefix,
&self.cfg.image_name,
coord.level,
coord.col,
coord.row,
ext,
),
crate::planner::Layout::Xyz => xyz_key(
&self.cfg.key_prefix,
&self.cfg.image_name,
coord.level,
coord.col,
coord.row,
ext,
),
crate::planner::Layout::Google => google_key(
&self.cfg.key_prefix,
&self.cfg.image_name,
coord.level,
coord.col,
coord.row,
ext,
),
};
Some(key)
}
}
impl TileSink for ObjectStoreSink {
fn write_tile(&self, tile: &Tile) -> Result<(), SinkError> {
let key = self
.key_for(tile.coord)
.ok_or_else(|| SinkError::Other(format!("invalid tile coord {:?}", tile.coord)))?;
let payload: Vec<u8> = if tile.blank {
vec![BLANK_TILE_MARKER]
} else {
encode_tile(&tile.raster, self.format)?
};
let _ = self.cfg.multipart_threshold;
let store =
self.cfg.store.as_ref().ok_or_else(|| {
SinkError::Other("ObjectStoreSink: backend is not configured".into())
})?;
store.put(&key, &payload)
}
fn finish(&self) -> Result<(), SinkError> {
Ok(())
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn deep_zoom_key_with_prefix() {
assert_eq!(
deep_zoom_key("pyramids/run-1", "output", 8, 3, 4, "png"),
"pyramids/run-1/output_files/8/3_4.png"
);
}
#[test]
fn deep_zoom_key_without_prefix() {
assert_eq!(
deep_zoom_key("", "image", 0, 0, 0, "png"),
"image_files/0/0_0.png"
);
}
#[test]
fn deep_zoom_key_trims_slashes() {
let k = deep_zoom_key("/foo/bar/", "img", 1, 2, 3, "jpg");
assert_eq!(k, "foo/bar/img_files/1/2_3.jpg");
assert!(!k.contains("//"));
}
#[test]
fn config_builder_sets_fields() {
let cfg = ObjectStoreConfig::s3("http://localhost:9000", "bucket")
.with_access_key("ak", "sk")
.with_key_prefix("p")
.with_image_name("img")
.with_multipart_threshold(1024);
assert_eq!(cfg.endpoint, "http://localhost:9000");
assert_eq!(cfg.bucket, "bucket");
assert_eq!(cfg.access_key.as_deref(), Some("ak"));
assert_eq!(cfg.secret_key.as_deref(), Some("sk"));
assert_eq!(cfg.key_prefix, "p");
assert_eq!(cfg.image_name, "img");
assert_eq!(cfg.multipart_threshold, 1024);
}
#[test]
fn new_without_backend_mentions_with_object_store() {
use crate::planner::{Layout, PyramidPlanner};
let cfg = ObjectStoreConfig::s3("http://localhost:9000", "bucket");
let plan = PyramidPlanner::new(256, 256, 256, 0, Layout::DeepZoom)
.expect("planner params are valid")
.plan();
let err = ObjectStoreSink::new(cfg, plan, TileFormat::Png).unwrap_err();
let msg = format!("{err:?}");
assert!(
msg.contains("with_object_store"),
"error should point callers to the injection API: {msg}"
);
}
}