use std::collections::HashMap;
use oxigdal_core::error::OxiGdalError;
use serde_json::Value as JsonValue;
use crate::{Result, open::OpenedDataset};
#[derive(Debug, Clone)]
pub struct StreamingFeature {
pub geometry: Option<Vec<u8>>,
pub properties: HashMap<String, JsonValue>,
pub id: Option<String>,
}
impl StreamingFeature {
pub fn new(geometry: Option<Vec<u8>>, properties: HashMap<String, JsonValue>) -> Self {
Self {
geometry,
properties,
id: None,
}
}
pub fn with_id(mut self, id: impl Into<String>) -> Self {
self.id = Some(id.into());
self
}
pub fn has_geometry(&self) -> bool {
self.geometry.is_some()
}
pub fn geometry_byte_len(&self) -> usize {
self.geometry.as_ref().map_or(0, |g| g.len())
}
}
pub struct FeatureStream {
inner: std::vec::IntoIter<StreamingFeature>,
total_count: usize,
yielded: usize,
}
impl FeatureStream {
pub fn from_vec(features: Vec<StreamingFeature>) -> Self {
let total_count = features.len();
Self {
inner: features.into_iter(),
total_count,
yielded: 0,
}
}
pub fn empty() -> Self {
Self::from_vec(Vec::new())
}
pub fn total_count(&self) -> usize {
self.total_count
}
pub fn yielded_count(&self) -> usize {
self.yielded
}
pub fn remaining(&self) -> usize {
self.total_count.saturating_sub(self.yielded)
}
}
impl Iterator for FeatureStream {
type Item = Result<StreamingFeature>;
fn next(&mut self) -> Option<Self::Item> {
match self.inner.next() {
Some(feature) => {
self.yielded += 1;
Some(Ok(feature))
}
None => None,
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = self.remaining();
(remaining, Some(remaining))
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct RasterTile {
pub x: u32,
pub y: u32,
pub zoom: u8,
pub data: Vec<u8>,
}
impl RasterTile {
pub fn tiles_per_axis(zoom: u8) -> u32 {
if zoom >= 32 { u32::MAX } else { 1u32 << zoom }
}
pub fn normalised_bbox(&self) -> (f64, f64, f64, f64) {
let n = Self::tiles_per_axis(self.zoom) as f64;
let min_x = self.x as f64 / n;
let min_y = self.y as f64 / n;
let max_x = (self.x + 1) as f64 / n;
let max_y = (self.y + 1) as f64 / n;
(min_x, min_y, max_x, max_y)
}
pub fn has_data(&self) -> bool {
!self.data.is_empty()
}
}
pub struct TileStream {
zoom: u8,
current_x: u32,
current_y: u32,
max_x: u32,
max_y: u32,
yielded: u64,
}
impl TileStream {
pub fn full_zoom(zoom: u8) -> Self {
let dim = RasterTile::tiles_per_axis(zoom);
Self {
zoom,
current_x: 0,
current_y: 0,
max_x: dim,
max_y: dim,
yielded: 0,
}
}
pub fn from_range(zoom: u8, x_range: (u32, u32), y_range: (u32, u32)) -> Result<Self> {
let dim = RasterTile::tiles_per_axis(zoom);
let (x_start, x_end) = x_range;
let (y_start, y_end) = y_range;
if x_end > dim || y_end > dim {
return Err(OxiGdalError::OutOfBounds {
message: format!(
"tile range ({x_start}..{x_end}, {y_start}..{y_end}) exceeds 2^{zoom} = {dim}"
),
});
}
if x_start >= x_end || y_start >= y_end {
return Err(OxiGdalError::InvalidParameter {
parameter: "tile_range",
message: format!(
"empty or inverted tile range: x={x_start}..{x_end}, y={y_start}..{y_end}"
),
});
}
Ok(Self {
zoom,
current_x: x_start,
current_y: y_start,
max_x: x_end,
max_y: y_end,
yielded: 0,
})
}
pub fn total_tiles(&self) -> u64 {
(self.max_x - self.current_x) as u64 * (self.max_y - self.current_y) as u64 + self.yielded
}
pub fn yielded_count(&self) -> u64 {
self.yielded
}
pub fn zoom(&self) -> u8 {
self.zoom
}
}
impl Iterator for TileStream {
type Item = Result<RasterTile>;
fn next(&mut self) -> Option<Self::Item> {
if self.current_y >= self.max_y {
return None;
}
let tile = RasterTile {
x: self.current_x,
y: self.current_y,
zoom: self.zoom,
data: Vec::new(), };
self.current_x += 1;
if self.current_x >= self.max_x {
self.current_x = 0; self.current_y += 1;
}
self.yielded += 1;
Some(Ok(tile))
}
fn size_hint(&self) -> (usize, Option<usize>) {
let remaining = (self.max_x.saturating_sub(self.current_x) as u64
+ (self.max_y.saturating_sub(self.current_y).saturating_sub(1)) as u64
* (self.max_x as u64)) as usize;
(remaining, Some(remaining))
}
}
fn stream_geojson_features(info: &crate::DatasetInfo) -> Result<FeatureStream> {
#[cfg(feature = "geojson")]
{
let path = match &info.path {
Some(p) => p.clone(),
None => return Ok(FeatureStream::empty()),
};
let file = std::fs::File::open(&path).map_err(|e| {
OxiGdalError::Io(oxigdal_core::error::IoError::Read {
message: format!("cannot open GeoJSON for streaming '{path}': {e}"),
})
})?;
use oxigdal_geojson::GeoJsonReader;
let buf_reader = std::io::BufReader::new(file);
let mut reader = GeoJsonReader::without_validation(buf_reader);
let fc = match reader.read_feature_collection() {
Ok(fc) => fc,
Err(_) => return Ok(FeatureStream::empty()),
};
let features = fc
.features
.into_iter()
.map(|f| {
let geometry = f.geometry.and_then(|g| g.to_wkb());
let properties: HashMap<String, JsonValue> =
f.properties.unwrap_or_default().into_iter().collect();
let mut sf = StreamingFeature::new(geometry, properties);
if let Some(id) = f.id {
sf = sf.with_id(id.as_string());
}
sf
})
.collect::<Vec<_>>();
Ok(FeatureStream::from_vec(features))
}
#[cfg(not(feature = "geojson"))]
{
let _ = info;
Ok(FeatureStream::empty())
}
}
fn stream_flatgeobuf_features(info: &crate::DatasetInfo) -> Result<FeatureStream> {
#[cfg(feature = "flatgeobuf")]
{
use oxigdal_core::error::IoError;
use oxigdal_flatgeobuf::FlatGeobufReader;
let path = match &info.path {
Some(p) => p.clone(),
None => return Ok(FeatureStream::empty()),
};
let file = std::fs::File::open(&path).map_err(|e| {
OxiGdalError::Io(IoError::Read {
message: format!("cannot open FlatGeobuf for streaming '{path}': {e}"),
})
})?;
let mut reader = FlatGeobufReader::new(file).map_err(|e| OxiGdalError::Internal {
message: e.to_string(),
})?;
let iter = reader.features().map_err(|e| OxiGdalError::Internal {
message: e.to_string(),
})?;
let features = iter
.filter_map(|result| {
result
.map_err(|e| OxiGdalError::Internal {
message: e.to_string(),
})
.ok()
})
.map(|f| {
let geometry = f.geometry.map(|g| g.to_wkb());
let properties: HashMap<String, JsonValue> = f
.properties
.into_iter()
.map(|(k, v)| (k, v.to_json_value()))
.collect();
StreamingFeature::new(geometry, properties)
})
.collect::<Vec<_>>();
Ok(FeatureStream::from_vec(features))
}
#[cfg(not(feature = "flatgeobuf"))]
{
let _ = info;
Ok(FeatureStream::empty())
}
}
fn stream_shapefile_features(info: &crate::DatasetInfo) -> Result<FeatureStream> {
#[cfg(feature = "shapefile")]
{
use oxigdal_core::error::IoError;
use oxigdal_shapefile::ShapefileReader;
let raw_path = match &info.path {
Some(p) => p.clone(),
None => return Ok(FeatureStream::empty()),
};
let base_path = {
let p = std::path::Path::new(&raw_path);
match p.extension().and_then(|e| e.to_str()) {
Some("shp") | Some("SHP") => p.with_extension("").to_string_lossy().into_owned(),
_ => raw_path.clone(),
}
};
let reader = ShapefileReader::open(&base_path).map_err(|e| {
OxiGdalError::Io(IoError::Read {
message: format!("cannot open Shapefile for streaming '{base_path}': {e}"),
})
})?;
let shapefile_features = reader.read_features().map_err(|e| OxiGdalError::Internal {
message: e.to_string(),
})?;
let features = shapefile_features
.into_iter()
.map(|sf| {
let geometry = sf.geometry.map(|g| g.to_wkb());
let properties: HashMap<String, JsonValue> = sf
.attributes
.into_iter()
.map(|(k, v)| (k, v.to_json_value()))
.collect();
StreamingFeature::new(geometry, properties)
})
.collect::<Vec<_>>();
Ok(FeatureStream::from_vec(features))
}
#[cfg(not(feature = "shapefile"))]
{
let _ = info;
Ok(FeatureStream::empty())
}
}
pub trait StreamingExt {
fn features(&self) -> Result<FeatureStream>;
fn tiles(&self, zoom: u8) -> Result<TileStream>;
}
impl StreamingExt for OpenedDataset {
fn features(&self) -> Result<FeatureStream> {
match self {
OpenedDataset::GeoJson(info) => stream_geojson_features(info),
OpenedDataset::Shapefile(info) => stream_shapefile_features(info),
OpenedDataset::FlatGeobuf(info) => stream_flatgeobuf_features(info),
OpenedDataset::GeoPackage(_)
| OpenedDataset::GeoParquet(_)
| OpenedDataset::Stac(_)
| OpenedDataset::Unknown(_) => {
Ok(FeatureStream::empty())
}
other => Err(OxiGdalError::NotSupported {
operation: format!(
"features() is not supported for raster format '{}'",
other.format().driver_name()
),
}),
}
}
fn tiles(&self, zoom: u8) -> Result<TileStream> {
match self {
OpenedDataset::GeoTiff(_)
| OpenedDataset::Jpeg2000(_)
| OpenedDataset::NetCdf(_)
| OpenedDataset::Hdf5(_)
| OpenedDataset::Zarr(_)
| OpenedDataset::Grib(_)
| OpenedDataset::Vrt(_)
| OpenedDataset::Unknown(_) => Ok(TileStream::full_zoom(zoom)),
other => Err(OxiGdalError::NotSupported {
operation: format!(
"tiles() is not supported for vector format '{}'",
other.format().driver_name()
),
}),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::open::open;
use std::io::Write;
fn make_temp_file(name: &str, content: &[u8]) -> std::path::PathBuf {
let dir = std::env::temp_dir();
let path = dir.join(name);
let mut f = std::fs::File::create(&path).expect("create");
f.write_all(content).expect("write");
path
}
#[test]
fn test_feature_stream_empty() {
let stream = FeatureStream::empty();
assert_eq!(stream.total_count(), 0);
assert_eq!(stream.remaining(), 0);
}
#[test]
fn test_feature_stream_from_vec_yields_all() {
let features = vec![
StreamingFeature::new(None, HashMap::new()),
StreamingFeature::new(None, HashMap::new()),
StreamingFeature::new(None, HashMap::new()),
];
let mut stream = FeatureStream::from_vec(features);
assert_eq!(stream.total_count(), 3);
assert_eq!(stream.yielded_count(), 0);
let first = stream.next().expect("has first").expect("no error");
assert!(first.geometry.is_none());
assert_eq!(stream.yielded_count(), 1);
assert_eq!(stream.remaining(), 2);
stream.next().expect("second").expect("no error");
stream.next().expect("third").expect("no error");
assert!(stream.next().is_none(), "stream exhausted");
}
#[test]
fn test_feature_stream_with_properties() {
let mut props = HashMap::new();
props.insert("name".to_string(), JsonValue::String("Tokyo".to_string()));
props.insert(
"pop".to_string(),
JsonValue::Number(serde_json::Number::from(9_273_000u64)),
);
let feature = StreamingFeature::new(None, props);
assert_eq!(feature.properties["name"], "Tokyo");
assert!(!feature.has_geometry());
assert_eq!(feature.geometry_byte_len(), 0);
}
#[test]
fn test_feature_stream_with_geometry() {
let wkb: Vec<u8> = vec![
0x01, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x40, 0x5E, 0x40, 0x00, 0x00, 0x00, 0x00, 0x00, 0x80, 0x35, 0x40, ];
let feature = StreamingFeature::new(Some(wkb.clone()), HashMap::new());
assert!(feature.has_geometry());
assert_eq!(feature.geometry_byte_len(), wkb.len());
}
#[test]
fn test_feature_with_id() {
let feature = StreamingFeature::new(None, HashMap::new()).with_id("feature-001");
assert_eq!(feature.id.as_deref(), Some("feature-001"));
}
#[test]
fn test_feature_stream_size_hint() {
let features = vec![
StreamingFeature::new(None, HashMap::new()),
StreamingFeature::new(None, HashMap::new()),
];
let mut stream = FeatureStream::from_vec(features);
assert_eq!(stream.size_hint(), (2, Some(2)));
stream.next();
assert_eq!(stream.size_hint(), (1, Some(1)));
}
#[test]
fn test_raster_tile_tiles_per_axis() {
assert_eq!(RasterTile::tiles_per_axis(0), 1);
assert_eq!(RasterTile::tiles_per_axis(1), 2);
assert_eq!(RasterTile::tiles_per_axis(8), 256);
assert_eq!(RasterTile::tiles_per_axis(16), 65_536);
}
#[test]
fn test_raster_tile_normalised_bbox_zoom0() {
let tile = RasterTile {
x: 0,
y: 0,
zoom: 0,
data: vec![],
};
let (min_x, min_y, max_x, max_y) = tile.normalised_bbox();
assert!((min_x - 0.0).abs() < 1e-9);
assert!((min_y - 0.0).abs() < 1e-9);
assert!((max_x - 1.0).abs() < 1e-9);
assert!((max_y - 1.0).abs() < 1e-9);
}
#[test]
fn test_raster_tile_normalised_bbox_zoom1() {
let tile = RasterTile {
x: 1,
y: 0,
zoom: 1,
data: vec![],
};
let (min_x, _min_y, max_x, _max_y) = tile.normalised_bbox();
assert!((min_x - 0.5).abs() < 1e-9);
assert!((max_x - 1.0).abs() < 1e-9);
}
#[test]
fn test_raster_tile_has_data() {
let empty_tile = RasterTile {
x: 0,
y: 0,
zoom: 1,
data: vec![],
};
assert!(!empty_tile.has_data());
let data_tile = RasterTile {
x: 0,
y: 0,
zoom: 1,
data: vec![0xFF],
};
assert!(data_tile.has_data());
}
#[test]
fn test_tile_stream_zoom0_yields_one_tile() {
let mut stream = TileStream::full_zoom(0);
assert_eq!(stream.zoom(), 0);
let tile = stream.next().expect("has tile").expect("no error");
assert_eq!((tile.x, tile.y, tile.zoom), (0, 0, 0));
assert!(stream.next().is_none(), "only one tile at zoom 0");
}
#[test]
fn test_tile_stream_zoom1_yields_four_tiles() {
let stream = TileStream::full_zoom(1);
let tiles: Vec<_> = stream.map(|t| t.expect("ok")).collect();
assert_eq!(tiles.len(), 4, "2^1 × 2^1 = 4 tiles");
}
#[test]
fn test_tile_stream_row_major_order() {
let stream = TileStream::full_zoom(1);
let tiles: Vec<_> = stream.map(|t| t.expect("ok")).collect();
assert_eq!((tiles[0].x, tiles[0].y), (0, 0));
assert_eq!((tiles[1].x, tiles[1].y), (1, 0));
assert_eq!((tiles[2].x, tiles[2].y), (0, 1));
assert_eq!((tiles[3].x, tiles[3].y), (1, 1));
}
#[test]
fn test_tile_stream_zoom2_total() {
let stream = TileStream::full_zoom(2);
assert_eq!(stream.count(), 16, "2^2 × 2^2 = 16");
}
#[test]
fn test_tile_stream_from_range_valid() {
let stream = TileStream::from_range(3, (0, 2), (0, 2)).expect("valid range");
let tiles: Vec<_> = stream.map(|t| t.expect("ok")).collect();
assert_eq!(tiles.len(), 4, "2×2 sub-range");
}
#[test]
fn test_tile_stream_from_range_out_of_bounds() {
let result = TileStream::from_range(1, (0, 5), (0, 2));
assert!(result.is_err(), "5 exceeds 2^1=2");
}
#[test]
fn test_tile_stream_from_range_empty_range_error() {
let result = TileStream::from_range(2, (1, 1), (0, 2));
assert!(result.is_err(), "empty range start==end should fail");
}
#[test]
fn test_tile_stream_yielded_count() {
let mut stream = TileStream::full_zoom(1);
assert_eq!(stream.yielded_count(), 0);
stream.next();
assert_eq!(stream.yielded_count(), 1);
stream.next();
assert_eq!(stream.yielded_count(), 2);
}
#[test]
fn test_streaming_ext_features_on_vector() {
let path = make_temp_file("stream_ext_geojson.geojson", b"{}");
let ds = open(&path).expect("open");
let stream_result = ds.features();
assert!(
stream_result.is_ok(),
"features() on GeoJSON should succeed"
);
}
#[test]
fn test_streaming_ext_features_on_raster_errors() {
let bytes = [0x49u8, 0x49, 0x2A, 0x00, 0x00, 0x00, 0x00, 0x00];
let path = make_temp_file("stream_ext_tiff.tif", &bytes);
let ds = open(&path).expect("open tiff");
let result = ds.features();
assert!(result.is_err(), "features() on raster dataset should error");
}
#[test]
fn test_streaming_ext_tiles_on_raster() {
let bytes = [0x49u8, 0x49, 0x2A, 0x00, 0x00, 0x00, 0x00, 0x00];
let path = make_temp_file("stream_ext_tiles_tiff.tif", &bytes);
let ds = open(&path).expect("open tiff");
let result = ds.tiles(2);
assert!(result.is_ok(), "tiles() on raster should succeed");
let stream = result.expect("stream");
assert_eq!(stream.zoom(), 2);
}
#[test]
fn test_streaming_ext_tiles_on_vector_errors() {
let path = make_temp_file("stream_ext_tiles_geojson.geojson", b"{}");
let ds = open(&path).expect("open");
let result = ds.tiles(2);
assert!(result.is_err(), "tiles() on vector should error");
}
#[test]
fn test_feature_stream_collect_empty() {
let path = make_temp_file("stream_collect_empty.geojson", b"{}");
let ds = open(&path).expect("open");
let features: Vec<_> = ds
.features()
.expect("features")
.collect::<Result<Vec<_>>>()
.expect("collect");
assert_eq!(
features.len(),
0,
"non-FeatureCollection GeoJSON returns no features"
);
}
#[cfg(feature = "geojson")]
#[test]
fn test_geojson_streaming_feature_collection_count() {
let content = br#"{"type":"FeatureCollection","features":[
{"type":"Feature","geometry":{"type":"Point","coordinates":[139.7,35.7]},"properties":{"name":"Tokyo"}},
{"type":"Feature","geometry":{"type":"Point","coordinates":[2.35,48.85]},"properties":{"name":"Paris"}}
]}"#;
let path = make_temp_file("stream_fc_count.geojson", content);
let ds = open(&path).expect("open");
let features: Vec<_> = ds
.features()
.expect("features")
.collect::<Result<Vec<_>>>()
.expect("collect");
assert_eq!(features.len(), 2, "should stream 2 features");
}
#[cfg(feature = "geojson")]
#[test]
fn test_geojson_streaming_properties_present() {
let content = br#"{"type":"FeatureCollection","features":[
{"type":"Feature","geometry":null,"properties":{"city":"Berlin","pop":3600000}}
]}"#;
let path = make_temp_file("stream_props.geojson", content);
let ds = open(&path).expect("open");
let mut stream = ds.features().expect("features");
let feat = stream.next().expect("first feature").expect("no error");
assert_eq!(feat.properties["city"], serde_json::json!("Berlin"));
assert_eq!(feat.properties["pop"], serde_json::json!(3600000));
}
#[cfg(feature = "geojson")]
#[test]
fn test_geojson_streaming_geometry_wkb() {
let content = br#"{"type":"FeatureCollection","features":[
{"type":"Feature","geometry":{"type":"Point","coordinates":[139.7,35.7]},"properties":{}}
]}"#;
let path = make_temp_file("stream_wkb.geojson", content);
let ds = open(&path).expect("open");
let mut stream = ds.features().expect("features");
let feat = stream.next().expect("first feature").expect("no error");
assert!(
feat.has_geometry(),
"Point feature should have WKB geometry"
);
assert_eq!(feat.geometry_byte_len(), 21, "WKB Point should be 21 bytes");
let wkb = feat.geometry.as_ref().expect("geometry");
assert_eq!(wkb[0], 0x01, "little-endian byte-order flag");
assert_eq!(&wkb[1..5], &1u32.to_le_bytes(), "WKB type = Point (1)");
}
#[cfg(feature = "geojson")]
#[test]
fn test_geojson_streaming_null_geometry_is_none() {
let content = br#"{"type":"FeatureCollection","features":[
{"type":"Feature","geometry":null,"properties":{"note":"no geom"}}
]}"#;
let path = make_temp_file("stream_null_geom.geojson", content);
let ds = open(&path).expect("open");
let mut stream = ds.features().expect("features");
let feat = stream.next().expect("first feature").expect("no error");
assert!(!feat.has_geometry(), "null geometry should produce None");
}
#[cfg(feature = "geojson")]
#[test]
fn test_geojson_streaming_feature_id() {
let content = br#"{"type":"FeatureCollection","features":[
{"type":"Feature","id":"feat-001","geometry":null,"properties":{}}
]}"#;
let path = make_temp_file("stream_id.geojson", content);
let ds = open(&path).expect("open");
let mut stream = ds.features().expect("features");
let feat = stream.next().expect("first feature").expect("no error");
assert_eq!(feat.id.as_deref(), Some("feat-001"));
}
#[test]
fn test_tile_stream_all_coordinates_in_range() {
let zoom = 3u8;
let dim = RasterTile::tiles_per_axis(zoom);
let stream = TileStream::full_zoom(zoom);
for tile_result in stream {
let tile = tile_result.expect("ok");
assert!(tile.x < dim, "x={} should be < {dim}", tile.x);
assert!(tile.y < dim, "y={} should be < {dim}", tile.y);
}
}
}