use crate::error::Result;
use crate::protocol::message::{ChangeType, FeaturePayload, Message, MessageType, Payload};
use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FeatureUpdateType {
Created,
Updated,
Deleted,
}
impl From<FeatureUpdateType> for ChangeType {
fn from(update_type: FeatureUpdateType) -> Self {
match update_type {
FeatureUpdateType::Created => ChangeType::Created,
FeatureUpdateType::Updated => ChangeType::Updated,
FeatureUpdateType::Deleted => ChangeType::Deleted,
}
}
}
pub struct FeatureUpdate {
pub id: String,
pub layer: String,
pub update_type: FeatureUpdateType,
pub feature: serde_json::Value,
pub timestamp: i64,
}
impl FeatureUpdate {
pub fn new(
id: String,
layer: String,
update_type: FeatureUpdateType,
feature: serde_json::Value,
) -> Self {
Self {
id,
layer,
update_type,
feature,
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn created(id: String, layer: String, feature: serde_json::Value) -> Self {
Self::new(id, layer, FeatureUpdateType::Created, feature)
}
pub fn updated(id: String, layer: String, feature: serde_json::Value) -> Self {
Self::new(id, layer, FeatureUpdateType::Updated, feature)
}
pub fn deleted(id: String, layer: String) -> Self {
Self::new(
id,
layer,
FeatureUpdateType::Deleted,
serde_json::Value::Null,
)
}
pub fn to_message(&self) -> Message {
let payload = Payload::FeatureData(FeaturePayload {
id: self.id.clone(),
layer: self.layer.clone(),
feature: self.feature.clone(),
change_type: self.update_type.into(),
});
Message::new(MessageType::FeatureUpdate, payload)
}
}
pub struct FeatureUpdateManager {
updates: Arc<RwLock<HashMap<String, VecDeque<FeatureUpdate>>>>,
max_queue_size: usize,
stats: Arc<FeatureUpdateStats>,
}
struct FeatureUpdateStats {
total_updates: AtomicU64,
created: AtomicU64,
updated: AtomicU64,
deleted: AtomicU64,
dropped_updates: AtomicU64,
}
impl FeatureUpdateManager {
pub fn new(max_queue_size: usize) -> Self {
Self {
updates: Arc::new(RwLock::new(HashMap::new())),
max_queue_size,
stats: Arc::new(FeatureUpdateStats {
total_updates: AtomicU64::new(0),
created: AtomicU64::new(0),
updated: AtomicU64::new(0),
deleted: AtomicU64::new(0),
dropped_updates: AtomicU64::new(0),
}),
}
}
pub fn add_update(&self, update: FeatureUpdate) -> Result<()> {
self.stats.total_updates.fetch_add(1, Ordering::Relaxed);
match update.update_type {
FeatureUpdateType::Created => {
self.stats.created.fetch_add(1, Ordering::Relaxed);
}
FeatureUpdateType::Updated => {
self.stats.updated.fetch_add(1, Ordering::Relaxed);
}
FeatureUpdateType::Deleted => {
self.stats.deleted.fetch_add(1, Ordering::Relaxed);
}
}
let mut updates = self.updates.write();
let queue = updates.entry(update.layer.clone()).or_default();
if queue.len() >= self.max_queue_size {
queue.pop_front();
self.stats.dropped_updates.fetch_add(1, Ordering::Relaxed);
}
queue.push_back(update);
Ok(())
}
pub fn get_updates(&self, layer: &str) -> Vec<FeatureUpdate> {
let mut updates = self.updates.write();
if let Some(queue) = updates.get_mut(layer) {
queue.drain(..).collect()
} else {
Vec::new()
}
}
pub fn get_all_updates(&self) -> HashMap<String, Vec<FeatureUpdate>> {
let mut updates = self.updates.write();
let mut result = HashMap::new();
for (layer, queue) in updates.iter_mut() {
result.insert(layer.clone(), queue.drain(..).collect());
}
result
}
pub fn clear_layer(&self, layer: &str) {
let mut updates = self.updates.write();
updates.remove(layer);
}
pub fn clear_all(&self) {
let mut updates = self.updates.write();
updates.clear();
}
pub fn pending_count(&self) -> usize {
let updates = self.updates.read();
updates.values().map(|q| q.len()).sum()
}
pub fn layers_with_updates(&self) -> Vec<String> {
let updates = self.updates.read();
updates
.iter()
.filter(|(_, q)| !q.is_empty())
.map(|(k, _)| k.clone())
.collect()
}
pub async fn stats(&self) -> FeatureUpdateManagerStats {
FeatureUpdateManagerStats {
total_updates: self.stats.total_updates.load(Ordering::Relaxed),
created: self.stats.created.load(Ordering::Relaxed),
updated: self.stats.updated.load(Ordering::Relaxed),
deleted: self.stats.deleted.load(Ordering::Relaxed),
dropped_updates: self.stats.dropped_updates.load(Ordering::Relaxed),
pending_updates: self.pending_count(),
}
}
}
#[derive(Debug, Clone)]
pub struct FeatureUpdateManagerStats {
pub total_updates: u64,
pub created: u64,
pub updated: u64,
pub deleted: u64,
pub dropped_updates: u64,
pub pending_updates: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_feature_update() {
let feature = serde_json::json!({
"type": "Feature",
"geometry": {"type": "Point", "coordinates": [0.0, 0.0]},
"properties": {}
});
let update = FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature);
assert_eq!(update.id, "f1");
assert_eq!(update.layer, "layer1");
assert_eq!(update.update_type, FeatureUpdateType::Created);
}
#[test]
fn test_feature_update_deleted() {
let update = FeatureUpdate::deleted("f1".to_string(), "layer1".to_string());
assert_eq!(update.update_type, FeatureUpdateType::Deleted);
assert_eq!(update.feature, serde_json::Value::Null);
}
#[test]
fn test_feature_update_manager() -> Result<()> {
let manager = FeatureUpdateManager::new(10);
let feature = serde_json::json!({"type": "Feature"});
let update = FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature);
manager.add_update(update)?;
assert_eq!(manager.pending_count(), 1);
let updates = manager.get_updates("layer1");
assert_eq!(updates.len(), 1);
assert_eq!(manager.pending_count(), 0);
Ok(())
}
#[test]
fn test_feature_update_layers() -> Result<()> {
let manager = FeatureUpdateManager::new(10);
let feature = serde_json::json!({"type": "Feature"});
let update1 =
FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature.clone());
let update2 = FeatureUpdate::created("f2".to_string(), "layer2".to_string(), feature);
manager.add_update(update1)?;
manager.add_update(update2)?;
let layers = manager.layers_with_updates();
assert_eq!(layers.len(), 2);
assert!(layers.contains(&"layer1".to_string()));
assert!(layers.contains(&"layer2".to_string()));
Ok(())
}
#[tokio::test]
async fn test_feature_update_stats() -> Result<()> {
let manager = FeatureUpdateManager::new(10);
let feature = serde_json::json!({"type": "Feature"});
let created =
FeatureUpdate::created("f1".to_string(), "layer1".to_string(), feature.clone());
let updated = FeatureUpdate::updated("f2".to_string(), "layer1".to_string(), feature);
let deleted = FeatureUpdate::deleted("f3".to_string(), "layer1".to_string());
manager.add_update(created)?;
manager.add_update(updated)?;
manager.add_update(deleted)?;
let stats = manager.stats().await;
assert_eq!(stats.total_updates, 3);
assert_eq!(stats.created, 1);
assert_eq!(stats.updated, 1);
assert_eq!(stats.deleted, 1);
Ok(())
}
}