use crate::error::{Error, Result};
use crate::protocol::message::{Message, MessageType, Payload, TilePayload};
use parking_lot::RwLock;
use std::collections::{HashMap, VecDeque};
use std::fmt;
use std::sync::Arc;
use std::sync::atomic::{AtomicU64, Ordering};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum TileUpdateType {
Full,
Delta,
Invalidate,
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct TileCoord {
pub z: u8,
pub x: u32,
pub y: u32,
}
impl TileCoord {
pub fn new(z: u8, x: u32, y: u32) -> Self {
Self { z, x, y }
}
pub fn from_string(s: &str) -> Result<Self> {
let parts: Vec<&str> = s.split('/').collect();
if parts.len() != 3 {
return Err(Error::Protocol(format!("Invalid tile coord format: {}", s)));
}
let z = parts[0]
.parse()
.map_err(|_| Error::Protocol("Invalid z coordinate".to_string()))?;
let x = parts[1]
.parse()
.map_err(|_| Error::Protocol("Invalid x coordinate".to_string()))?;
let y = parts[2]
.parse()
.map_err(|_| Error::Protocol("Invalid y coordinate".to_string()))?;
Ok(Self { z, x, y })
}
}
impl fmt::Display for TileCoord {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}/{}/{}", self.z, self.x, self.y)
}
}
pub struct TileUpdate {
pub coord: TileCoord,
pub update_type: TileUpdateType,
pub data: Vec<u8>,
pub format: String,
pub delta: Option<Vec<u8>>,
pub timestamp: i64,
}
impl TileUpdate {
pub fn full(coord: TileCoord, data: Vec<u8>, format: String) -> Self {
Self {
coord,
update_type: TileUpdateType::Full,
data,
format,
delta: None,
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn delta(coord: TileCoord, data: Vec<u8>, delta: Vec<u8>, format: String) -> Self {
Self {
coord,
update_type: TileUpdateType::Delta,
data,
format,
delta: Some(delta),
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn invalidate(coord: TileCoord) -> Self {
Self {
coord,
update_type: TileUpdateType::Invalidate,
data: Vec::new(),
format: String::new(),
delta: None,
timestamp: chrono::Utc::now().timestamp_millis(),
}
}
pub fn to_message(&self) -> Message {
let payload = Payload::TileData(TilePayload {
z: self.coord.z,
x: self.coord.x,
y: self.coord.y,
data: self.data.clone(),
format: self.format.clone(),
delta: self.delta.clone(),
});
Message::new(MessageType::TileUpdate, payload)
}
}
pub struct TileUpdateManager {
updates: Arc<RwLock<HashMap<TileCoord, VecDeque<TileUpdate>>>>,
max_queue_size: usize,
stats: Arc<TileUpdateStats>,
}
struct TileUpdateStats {
total_updates: AtomicU64,
full_updates: AtomicU64,
delta_updates: AtomicU64,
invalidations: AtomicU64,
dropped_updates: AtomicU64,
}
impl TileUpdateManager {
pub fn new(max_queue_size: usize) -> Self {
Self {
updates: Arc::new(RwLock::new(HashMap::new())),
max_queue_size,
stats: Arc::new(TileUpdateStats {
total_updates: AtomicU64::new(0),
full_updates: AtomicU64::new(0),
delta_updates: AtomicU64::new(0),
invalidations: AtomicU64::new(0),
dropped_updates: AtomicU64::new(0),
}),
}
}
pub fn add_update(&self, update: TileUpdate) -> Result<()> {
self.stats.total_updates.fetch_add(1, Ordering::Relaxed);
match update.update_type {
TileUpdateType::Full => {
self.stats.full_updates.fetch_add(1, Ordering::Relaxed);
}
TileUpdateType::Delta => {
self.stats.delta_updates.fetch_add(1, Ordering::Relaxed);
}
TileUpdateType::Invalidate => {
self.stats.invalidations.fetch_add(1, Ordering::Relaxed);
}
}
let mut updates = self.updates.write();
let queue = updates.entry(update.coord).or_default();
if queue.len() >= self.max_queue_size {
queue.pop_front();
self.stats.dropped_updates.fetch_add(1, Ordering::Relaxed);
}
queue.push_back(update);
Ok(())
}
pub fn get_updates(&self, coord: &TileCoord) -> Vec<TileUpdate> {
let mut updates = self.updates.write();
if let Some(queue) = updates.get_mut(coord) {
queue.drain(..).collect()
} else {
Vec::new()
}
}
pub fn get_all_updates(&self) -> HashMap<TileCoord, Vec<TileUpdate>> {
let mut updates = self.updates.write();
let mut result = HashMap::new();
for (coord, queue) in updates.iter_mut() {
result.insert(*coord, queue.drain(..).collect());
}
result
}
pub fn clear_tile(&self, coord: &TileCoord) {
let mut updates = self.updates.write();
updates.remove(coord);
}
pub fn clear_all(&self) {
let mut updates = self.updates.write();
updates.clear();
}
pub fn pending_count(&self) -> usize {
let updates = self.updates.read();
updates.values().map(|q| q.len()).sum()
}
pub async fn stats(&self) -> TileUpdateManagerStats {
TileUpdateManagerStats {
total_updates: self.stats.total_updates.load(Ordering::Relaxed),
full_updates: self.stats.full_updates.load(Ordering::Relaxed),
delta_updates: self.stats.delta_updates.load(Ordering::Relaxed),
invalidations: self.stats.invalidations.load(Ordering::Relaxed),
dropped_updates: self.stats.dropped_updates.load(Ordering::Relaxed),
pending_updates: self.pending_count(),
}
}
}
#[derive(Debug, Clone)]
pub struct TileUpdateManagerStats {
pub total_updates: u64,
pub full_updates: u64,
pub delta_updates: u64,
pub invalidations: u64,
pub dropped_updates: u64,
pub pending_updates: usize,
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_tile_coord() {
let coord = TileCoord::new(10, 512, 384);
assert_eq!(coord.z, 10);
assert_eq!(coord.x, 512);
assert_eq!(coord.y, 384);
}
#[test]
fn test_tile_coord_string() -> Result<()> {
let coord = TileCoord::new(10, 512, 384);
let s = coord.to_string();
assert_eq!(s, "10/512/384");
let parsed = TileCoord::from_string(&s)?;
assert_eq!(parsed, coord);
Ok(())
}
#[test]
fn test_tile_update_full() {
let coord = TileCoord::new(10, 512, 384);
let data = vec![1, 2, 3, 4];
let update = TileUpdate::full(coord, data.clone(), "png".to_string());
assert_eq!(update.coord, coord);
assert_eq!(update.update_type, TileUpdateType::Full);
assert_eq!(update.data, data);
assert_eq!(update.format, "png");
}
#[test]
fn test_tile_update_delta() {
let coord = TileCoord::new(10, 512, 384);
let data = vec![1, 2, 3, 4];
let delta = vec![5, 6, 7, 8];
let update = TileUpdate::delta(coord, data.clone(), delta.clone(), "png".to_string());
assert_eq!(update.update_type, TileUpdateType::Delta);
assert_eq!(update.delta, Some(delta));
}
#[test]
fn test_tile_update_manager() -> Result<()> {
let manager = TileUpdateManager::new(10);
let coord = TileCoord::new(10, 512, 384);
let update = TileUpdate::full(coord, vec![1, 2, 3, 4], "png".to_string());
manager.add_update(update)?;
assert_eq!(manager.pending_count(), 1);
let updates = manager.get_updates(&coord);
assert_eq!(updates.len(), 1);
assert_eq!(manager.pending_count(), 0);
Ok(())
}
#[tokio::test]
async fn test_tile_update_stats() -> Result<()> {
let manager = TileUpdateManager::new(10);
let coord = TileCoord::new(10, 512, 384);
let full = TileUpdate::full(coord, vec![1, 2, 3], "png".to_string());
let delta = TileUpdate::delta(coord, vec![1, 2], vec![3, 4], "png".to_string());
let inv = TileUpdate::invalidate(coord);
manager.add_update(full)?;
manager.add_update(delta)?;
manager.add_update(inv)?;
let stats = manager.stats().await;
assert_eq!(stats.total_updates, 3);
assert_eq!(stats.full_updates, 1);
assert_eq!(stats.delta_updates, 1);
assert_eq!(stats.invalidations, 1);
Ok(())
}
}