use crate::error::{ServiceError, ServiceResult};
use crate::wfs::{FeatureSource, WfsState};
use axum::{
http::header,
response::{IntoResponse, Response},
};
use quick_xml::events::{BytesEnd, BytesStart, BytesText, Event};
use serde::Deserialize;
use std::io::Cursor;
#[derive(Debug, Clone, Deserialize)]
#[serde(rename_all = "PascalCase")]
pub enum TransactionAction {
Insert {
type_name: String,
features: Box<Vec<geojson::Feature>>,
},
Update {
type_name: String,
filter: Option<String>,
properties: Box<serde_json::Map<String, serde_json::Value>>,
},
Delete {
type_name: String,
filter: Option<String>,
},
Replace {
type_name: String,
filter: String,
feature: Box<geojson::Feature>,
},
}
#[derive(Debug, Deserialize)]
pub struct Transaction {
pub actions: Vec<TransactionAction>,
#[serde(default = "default_release_action")]
pub release_action: String,
pub lock_id: Option<String>,
}
fn default_release_action() -> String {
"ALL".to_string()
}
#[derive(Debug)]
pub struct TransactionResponse {
pub total_inserted: usize,
pub total_updated: usize,
pub total_deleted: usize,
pub total_replaced: usize,
pub inserted_fids: Vec<String>,
}
pub async fn handle_transaction(
state: &WfsState,
_version: &str,
params: &serde_json::Value,
) -> Result<Response, ServiceError> {
if !state.transactions_enabled {
return Err(ServiceError::UnsupportedOperation(
"Transactions not enabled".to_string(),
));
}
let transaction: Transaction = serde_json::from_value(params.clone())
.map_err(|e| ServiceError::InvalidParameter("Transaction".to_string(), e.to_string()))?;
let response = execute_transaction(state, transaction).await?;
generate_transaction_response(&response)
}
async fn execute_transaction(
state: &WfsState,
transaction: Transaction,
) -> ServiceResult<TransactionResponse> {
let mut total_inserted = 0;
let mut total_updated = 0;
let mut total_deleted = 0;
let mut total_replaced = 0;
let mut inserted_fids = Vec::new();
for action in transaction.actions {
match action {
TransactionAction::Insert {
type_name,
features,
} => {
let result = insert_features(state, &type_name, *features).await?;
total_inserted += result.len();
inserted_fids.extend(result);
}
TransactionAction::Update {
type_name,
filter,
properties,
} => {
let count =
update_features(state, &type_name, filter.as_deref(), *properties).await?;
total_updated += count;
}
TransactionAction::Delete { type_name, filter } => {
let count = delete_features(state, &type_name, filter.as_deref()).await?;
total_deleted += count;
}
TransactionAction::Replace {
type_name,
filter,
feature,
} => {
let count = replace_features(state, &type_name, &filter, *feature).await?;
total_replaced += count;
}
}
}
Ok(TransactionResponse {
total_inserted,
total_updated,
total_deleted,
total_replaced,
inserted_fids,
})
}
async fn insert_features(
state: &WfsState,
type_name: &str,
features: Vec<geojson::Feature>,
) -> ServiceResult<Vec<String>> {
let ft = state
.get_feature_type(type_name)
.ok_or_else(|| ServiceError::NotFound(format!("Feature type: {}", type_name)))?;
let mut fids = Vec::new();
for _ in 0..features.len() {
let fid = format!("{}.{}", type_name, uuid::Uuid::new_v4());
fids.push(fid);
}
match &ft.source {
FeatureSource::Memory(_) => {
Ok(fids)
}
FeatureSource::File(_) => Err(ServiceError::Transaction(
"File-based transactions not yet implemented".to_string(),
)),
FeatureSource::Database(_) => Err(ServiceError::Transaction(
"Database transactions not yet implemented".to_string(),
)),
FeatureSource::DatabaseSource(db) => {
let table = db.qualified_table_name();
let geom_col = &db.geometry_column;
let id_col = db.id_column.as_deref().unwrap_or("id");
let columns: Vec<String> = if let Some(first) = features.first() {
let mut cols = vec![id_col.to_string(), geom_col.clone()];
if let Some(props) = &first.properties {
cols.extend(props.keys().cloned());
}
cols
} else {
return Ok(fids); };
let _insert_sql = format!(
"INSERT INTO {} ({}) VALUES ({})",
table,
columns
.iter()
.map(|c| format!("\"{}\"", c))
.collect::<Vec<_>>()
.join(", "),
columns.iter().map(|_| "?").collect::<Vec<_>>().join(", ")
);
Err(ServiceError::Transaction(format!(
"Database insert requires connection. Use oxigdal-postgis for PostGIS. Table: {}, Features: {}",
table,
features.len()
)))
}
}
}
async fn update_features(
state: &WfsState,
type_name: &str,
filter: Option<&str>,
properties: serde_json::Map<String, serde_json::Value>,
) -> ServiceResult<usize> {
let ft = state
.get_feature_type(type_name)
.ok_or_else(|| ServiceError::NotFound(format!("Feature type: {}", type_name)))?;
match &ft.source {
FeatureSource::Memory(_) => {
Ok(0)
}
FeatureSource::File(_) => Err(ServiceError::Transaction(
"File-based transactions not yet implemented".to_string(),
)),
FeatureSource::Database(_) => Err(ServiceError::Transaction(
"Database transactions not yet implemented".to_string(),
)),
FeatureSource::DatabaseSource(db) => {
let table = db.qualified_table_name();
let set_clauses: Vec<String> = properties
.keys()
.map(|k| format!("\"{}\" = ?", k))
.collect();
if set_clauses.is_empty() {
return Ok(0); }
let where_clause = filter.map(|f| format!(" WHERE {}", f)).unwrap_or_default();
let _update_sql = format!(
"UPDATE {} SET {}{}",
table,
set_clauses.join(", "),
where_clause
);
Err(ServiceError::Transaction(format!(
"Database update requires connection. Use oxigdal-postgis for PostGIS. Table: {}, Properties: {}",
table,
properties.len()
)))
}
}
}
async fn delete_features(
state: &WfsState,
type_name: &str,
filter: Option<&str>,
) -> ServiceResult<usize> {
let ft = state
.get_feature_type(type_name)
.ok_or_else(|| ServiceError::NotFound(format!("Feature type: {}", type_name)))?;
match &ft.source {
FeatureSource::Memory(_) => {
Ok(0)
}
FeatureSource::File(_) => Err(ServiceError::Transaction(
"File-based transactions not yet implemented".to_string(),
)),
FeatureSource::Database(_) => Err(ServiceError::Transaction(
"Database transactions not yet implemented".to_string(),
)),
FeatureSource::DatabaseSource(db) => {
let table = db.qualified_table_name();
let where_clause = match filter {
Some(f) => format!(" WHERE {}", f),
None => {
return Err(ServiceError::Transaction(
"Delete operation requires a filter for database sources".to_string(),
));
}
};
let _delete_sql = format!("DELETE FROM {}{}", table, where_clause);
Err(ServiceError::Transaction(format!(
"Database delete requires connection. Use oxigdal-postgis for PostGIS. Table: {}",
table
)))
}
}
}
async fn replace_features(
state: &WfsState,
type_name: &str,
filter: &str,
feature: geojson::Feature,
) -> ServiceResult<usize> {
let ft = state
.get_feature_type(type_name)
.ok_or_else(|| ServiceError::NotFound(format!("Feature type: {}", type_name)))?;
match &ft.source {
FeatureSource::Memory(_) => {
Ok(0)
}
FeatureSource::File(_) => Err(ServiceError::Transaction(
"File-based transactions not yet implemented".to_string(),
)),
FeatureSource::Database(_) => Err(ServiceError::Transaction(
"Database transactions not yet implemented".to_string(),
)),
FeatureSource::DatabaseSource(db) => {
let table = db.qualified_table_name();
let geom_col = &db.geometry_column;
let mut set_clauses = Vec::new();
if feature.geometry.is_some() {
set_clauses.push(format!("\"{}\" = ?", geom_col));
}
if let Some(props) = &feature.properties {
for key in props.keys() {
set_clauses.push(format!("\"{}\" = ?", key));
}
}
if set_clauses.is_empty() {
return Ok(0); }
let _replace_sql = format!(
"UPDATE {} SET {} WHERE {}",
table,
set_clauses.join(", "),
filter
);
Err(ServiceError::Transaction(format!(
"Database replace requires connection. Use oxigdal-postgis for PostGIS. Table: {}",
table
)))
}
}
}
fn generate_transaction_response(response: &TransactionResponse) -> Result<Response, ServiceError> {
use quick_xml::{
Writer,
events::{BytesDecl, BytesEnd, BytesStart, Event},
};
use std::io::Cursor;
let mut writer = Writer::new(Cursor::new(Vec::new()));
writer
.write_event(Event::Decl(BytesDecl::new("1.0", Some("UTF-8"), None)))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
let mut root = BytesStart::new("wfs:TransactionResponse");
root.push_attribute(("version", "2.0.0"));
root.push_attribute(("xmlns:wfs", "http://www.opengis.net/wfs/2.0"));
root.push_attribute(("xmlns:xsi", "http://www.w3.org/2001/XMLSchema-instance"));
writer
.write_event(Event::Start(root))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
writer
.write_event(Event::Start(BytesStart::new("wfs:TransactionSummary")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
write_text_element(
&mut writer,
"wfs:totalInserted",
&response.total_inserted.to_string(),
)?;
write_text_element(
&mut writer,
"wfs:totalUpdated",
&response.total_updated.to_string(),
)?;
write_text_element(
&mut writer,
"wfs:totalDeleted",
&response.total_deleted.to_string(),
)?;
writer
.write_event(Event::End(BytesEnd::new("wfs:TransactionSummary")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
if !response.inserted_fids.is_empty() {
writer
.write_event(Event::Start(BytesStart::new("wfs:InsertResults")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
for fid in &response.inserted_fids {
writer
.write_event(Event::Start(BytesStart::new("wfs:Feature")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
write_text_element(&mut writer, "wfs:FeatureId", fid)?;
writer
.write_event(Event::End(BytesEnd::new("wfs:Feature")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
}
writer
.write_event(Event::End(BytesEnd::new("wfs:InsertResults")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
}
writer
.write_event(Event::End(BytesEnd::new("wfs:TransactionResponse")))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
let xml = String::from_utf8(writer.into_inner().into_inner())
.map_err(|e| ServiceError::Xml(e.to_string()))?;
Ok(([(header::CONTENT_TYPE, "application/xml")], xml).into_response())
}
fn write_text_element(
writer: &mut quick_xml::Writer<Cursor<Vec<u8>>>,
tag: &str,
text: &str,
) -> ServiceResult<()> {
writer
.write_event(Event::Start(BytesStart::new(tag)))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
writer
.write_event(Event::Text(BytesText::new(text)))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
writer
.write_event(Event::End(BytesEnd::new(tag)))
.map_err(|e| ServiceError::Xml(e.to_string()))?;
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_transaction_response_generation() -> Result<(), Box<dyn std::error::Error>> {
let response = TransactionResponse {
total_inserted: 5,
total_updated: 3,
total_deleted: 2,
total_replaced: 1,
inserted_fids: vec!["layer.123".to_string(), "layer.456".to_string()],
};
let result = generate_transaction_response(&response)?;
let (parts, _) = result.into_parts();
assert_eq!(
parts
.headers
.get(header::CONTENT_TYPE)
.and_then(|h| h.to_str().ok()),
Some("application/xml")
);
Ok(())
}
}