use std::collections::HashMap;
use oxigdal_core::error::{IoError, OxiGdalError};
use serde_json::Value as JsonValue;
use crate::streaming::{FeatureStream, StreamingFeature};
use crate::{DatasetInfo, Result};
pub(crate) fn stream_geopackage_features(info: &DatasetInfo) -> Result<FeatureStream> {
let path = match &info.path {
Some(p) => p.clone(),
None => return Ok(FeatureStream::empty()),
};
let data = std::fs::read(&path).map_err(|e| {
OxiGdalError::Io(IoError::Read {
message: format!("cannot read GeoPackage for streaming '{path}': {e}"),
})
})?;
use oxigdal_gpkg::{GeoPackage, GpkgBinaryParser, GpkgDataType};
let mut gpkg = GeoPackage::from_bytes(data).map_err(|e| OxiGdalError::Internal {
message: format!("cannot parse GeoPackage '{path}': {e}"),
})?;
if gpkg.load_contents().is_err() {
return Ok(FeatureStream::empty());
}
let feature_table_names: Vec<String> = gpkg
.contents
.iter()
.filter(|c| c.data_type == GpkgDataType::Features)
.map(|c| c.table_name.clone())
.collect();
let mut all_features: Vec<StreamingFeature> = Vec::new();
for table_name in &feature_table_names {
let rows =
match gpkg
.scan_table_by_name(table_name)
.map_err(|e| OxiGdalError::Internal {
message: format!("cannot scan table '{table_name}' in '{path}': {e}"),
})? {
Some(r) => r,
None => continue,
};
let geom_col_name =
gpkg_geometry_column_name(&gpkg, table_name).unwrap_or_else(|| "geom".to_string());
let col_names = column_names_from_master(&gpkg, table_name);
for (_rowid, cell_values) in rows {
use oxigdal_gpkg::CellValue;
let geom_idx = col_names
.iter()
.position(|n| n.eq_ignore_ascii_case(&geom_col_name));
let geometry: Option<Vec<u8>> = geom_idx
.and_then(|idx| cell_values.get(idx))
.and_then(|cv| match cv {
CellValue::Blob(b) => Some(b.clone()),
_ => None,
})
.and_then(|blob| GpkgBinaryParser::parse(&blob).ok())
.map(|g| GpkgBinaryParser::to_wkb(&g));
let properties: HashMap<String, JsonValue> = col_names
.iter()
.enumerate()
.filter(|(idx, name)| {
!name.eq_ignore_ascii_case(&geom_col_name) && (geom_idx != Some(*idx))
})
.filter_map(|(idx, name)| {
cell_values.get(idx).map(|cv| {
let jv = cell_value_to_json(cv);
(name.clone(), jv)
})
})
.collect();
all_features.push(StreamingFeature::new(geometry, properties));
}
}
Ok(FeatureStream::from_vec(all_features))
}
fn gpkg_geometry_column_name(gpkg: &oxigdal_gpkg::GeoPackage, table_name: &str) -> Option<String> {
let rows = gpkg.scan_table_by_name("gpkg_geometry_columns").ok()??;
for (_rowid, values) in rows {
use oxigdal_gpkg::CellValue;
if let (Some(CellValue::Text(tbl)), Some(CellValue::Text(col))) =
(values.first(), values.get(1))
{
if tbl.eq_ignore_ascii_case(table_name) {
return Some(col.clone());
}
}
}
None
}
fn column_names_from_master(gpkg: &oxigdal_gpkg::GeoPackage, table_name: &str) -> Vec<String> {
let fallback = |n: usize| -> Vec<String> { (0..n).map(|i| format!("col{i}")).collect() };
let master = match gpkg.scan_sqlite_master() {
Ok(m) => m,
Err(_) => return Vec::new(),
};
for entry in &master {
if entry.entry_type == "table" && entry.name.eq_ignore_ascii_case(table_name) {
if let Some(names) = parse_create_table_columns(&entry.sql) {
return names;
}
if let Ok(Some(rows)) = gpkg.scan_table_by_name(table_name) {
if let Some((_rid, cols)) = rows.first() {
return fallback(cols.len());
}
}
break;
}
}
Vec::new()
}
fn parse_create_table_columns(sql: &str) -> Option<Vec<String>> {
let open = sql.find('(')?;
let close = sql.rfind(')')?;
if close <= open {
return None;
}
let inner = &sql[open + 1..close];
let mut names = Vec::new();
for part in inner.split(',') {
let trimmed = part.trim();
if trimmed.is_empty() {
continue;
}
let upper = trimmed.to_ascii_uppercase();
if upper.starts_with("PRIMARY")
|| upper.starts_with("UNIQUE")
|| upper.starts_with("CHECK")
|| upper.starts_with("FOREIGN")
{
continue;
}
if let Some(name) = parse_column_name_token(trimmed) {
names.push(name);
}
}
if names.is_empty() { None } else { Some(names) }
}
fn parse_column_name_token(col_def: &str) -> Option<String> {
let col_def = col_def.trim();
if let Some(rest) = col_def.strip_prefix('"') {
let end = rest.find('"')?;
Some(rest[..end].to_string())
} else if let Some(rest) = col_def.strip_prefix('`') {
let end = rest.find('`')?;
Some(rest[..end].to_string())
} else if let Some(rest) = col_def.strip_prefix('[') {
let end = rest.find(']')?;
Some(rest[..end].to_string())
} else {
Some(col_def.split_whitespace().next()?.to_string())
}
}
fn cell_value_to_json(cv: &oxigdal_gpkg::CellValue) -> JsonValue {
use oxigdal_gpkg::CellValue;
match cv {
CellValue::Integer(i) => JsonValue::Number((*i).into()),
CellValue::Float(f) => serde_json::Number::from_f64(*f)
.map(JsonValue::Number)
.unwrap_or(JsonValue::Null),
CellValue::Text(s) => JsonValue::String(s.clone()),
CellValue::Blob(b) => {
let hex: String = b.iter().map(|byte| format!("{byte:02x}")).collect();
JsonValue::String(format!("0x{hex}"))
}
CellValue::Null => JsonValue::Null,
}
}