use crate::core::{StreamElement, StreamMessage};
use crate::error::{Result, StreamingError};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{DateTime, Utc};
use oxigdal_core::{
buffer::RasterBuffer,
types::{BoundingBox, GeoTransform, RasterDataType, RasterMetadata},
};
use serde::{Deserialize, Serialize};
use std::sync::Arc;
use tokio::sync::{mpsc, RwLock};
use tracing::{debug, info, warn};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct RasterStreamConfig {
pub chunk_size: (usize, usize),
pub overlap: usize,
pub buffer_size: usize,
pub compression: bool,
pub compression_level: u8,
pub max_memory_bytes: usize,
pub prefetch_count: usize,
pub parallel: bool,
pub num_workers: usize,
}
impl Default for RasterStreamConfig {
fn default() -> Self {
Self {
chunk_size: (512, 512),
overlap: 0,
buffer_size: 100,
compression: false,
compression_level: 6,
max_memory_bytes: 1024 * 1024 * 1024, prefetch_count: 2,
parallel: true,
num_workers: num_cpus::get(),
}
}
}
impl RasterStreamConfig {
pub fn new() -> Self {
Self::default()
}
pub fn with_chunk_size(mut self, width: usize, height: usize) -> Self {
self.chunk_size = (width, height);
self
}
pub fn with_overlap(mut self, overlap: usize) -> Self {
self.overlap = overlap;
self
}
pub fn with_compression(mut self, level: u8) -> Self {
self.compression = true;
self.compression_level = level;
self
}
pub fn with_max_memory(mut self, bytes: usize) -> Self {
self.max_memory_bytes = bytes;
self
}
pub fn with_prefetch(mut self, count: usize) -> Self {
self.prefetch_count = count;
self
}
pub fn with_parallel(mut self, parallel: bool, num_workers: usize) -> Self {
self.parallel = parallel;
self.num_workers = num_workers;
self
}
}
#[derive(Debug, Clone)]
pub struct RasterChunk {
pub buffer: RasterBuffer,
pub bbox: BoundingBox,
pub geotransform: GeoTransform,
pub indices: (usize, usize),
pub timestamp: DateTime<Utc>,
pub metadata: ChunkMetadata,
}
impl RasterChunk {
pub fn new(
buffer: RasterBuffer,
bbox: BoundingBox,
geotransform: GeoTransform,
indices: (usize, usize),
) -> Self {
Self {
buffer,
bbox,
geotransform,
indices,
timestamp: Utc::now(),
metadata: ChunkMetadata::default(),
}
}
pub fn size_bytes(&self) -> usize {
self.buffer.size_bytes()
}
pub fn overlaps_with(&self, other: &RasterChunk) -> bool {
self.bbox.intersects(&other.bbox)
}
pub fn overlap_region(&self, other: &RasterChunk) -> Option<BoundingBox> {
self.bbox.intersection(&other.bbox)
}
pub fn to_stream_element(&self) -> Result<StreamElement> {
let data = bincode::encode_to_vec(self, bincode::config::standard())
.map_err(|e| StreamingError::SerializationError(e.to_string()))?;
Ok(StreamElement::new(data, self.timestamp))
}
pub fn from_stream_element(element: &StreamElement) -> Result<Self> {
let (chunk, _): (Self, _) = bincode::decode_from_slice(&element.data, bincode::config::standard())
.map_err(|e| StreamingError::DeserializationError(e.to_string()))?;
Ok(chunk)
}
}
#[derive(Debug, Clone, Default, Serialize, Deserialize)]
pub struct ChunkMetadata {
pub bands: Vec<usize>,
pub compression: Option<String>,
pub original_size: Option<usize>,
pub compressed_size: Option<usize>,
pub checksum: Option<String>,
pub attributes: std::collections::HashMap<String, String>,
}
#[async_trait]
pub trait RasterStreaming: Send + Sync {
async fn next_chunk(&mut self) -> Result<Option<RasterChunk>>;
async fn next_chunks(&mut self, count: usize) -> Result<Vec<RasterChunk>>;
async fn seek_to_chunk(&mut self, row: usize, col: usize) -> Result<()>;
fn total_chunks(&self) -> (usize, usize);
fn current_position(&self) -> (usize, usize);
fn has_more_chunks(&self) -> bool;
}
pub struct RasterStream {
config: RasterStreamConfig,
metadata: RasterMetadata,
total_chunks: (usize, usize),
current_position: Arc<RwLock<(usize, usize)>>,
receiver: mpsc::Receiver<RasterChunk>,
prefetch_sender: Option<mpsc::Sender<(usize, usize)>>,
memory_usage: Arc<RwLock<usize>>,
}
impl RasterStream {
pub fn new(
config: RasterStreamConfig,
metadata: RasterMetadata,
) -> Result<Self> {
let total_chunks = Self::calculate_chunks(
metadata.width,
metadata.height,
config.chunk_size.0,
config.chunk_size.1,
config.overlap,
);
let (sender, receiver) = mpsc::channel(config.buffer_size);
info!(
"Created raster stream with {} x {} chunks",
total_chunks.0, total_chunks.1
);
Ok(Self {
config,
metadata,
total_chunks,
current_position: Arc::new(RwLock::new((0, 0))),
receiver,
prefetch_sender: None,
memory_usage: Arc::new(RwLock::new(0)),
})
}
fn calculate_chunks(
width: usize,
height: usize,
chunk_width: usize,
chunk_height: usize,
overlap: usize,
) -> (usize, usize) {
let effective_chunk_width = chunk_width - overlap;
let effective_chunk_height = chunk_height - overlap;
let num_cols = (width + effective_chunk_width - 1) / effective_chunk_width;
let num_rows = (height + effective_chunk_height - 1) / effective_chunk_height;
(num_rows, num_cols)
}
pub fn chunk_bbox(&self, row: usize, col: usize) -> Result<BoundingBox> {
if row >= self.total_chunks.0 || col >= self.total_chunks.1 {
return Err(StreamingError::InvalidOperation(
format!("Chunk ({}, {}) out of bounds", row, col)
));
}
let chunk_width = self.config.chunk_size.0;
let chunk_height = self.config.chunk_size.1;
let overlap = self.config.overlap;
let effective_width = chunk_width - overlap;
let effective_height = chunk_height - overlap;
let x_start = col * effective_width;
let y_start = row * effective_height;
let x_end = (x_start + chunk_width).min(self.metadata.width);
let y_end = (y_start + chunk_height).min(self.metadata.height);
let gt = self.metadata.geotransform.as_ref()
.ok_or_else(|| StreamingError::InvalidState("No geotransform available".to_string()))?;
let min_x = gt.origin_x + (x_start as f64) * gt.pixel_width;
let max_y = gt.origin_y + (y_start as f64) * gt.pixel_height;
let max_x = gt.origin_x + (x_end as f64) * gt.pixel_width;
let min_y = gt.origin_y + (y_end as f64) * gt.pixel_height;
BoundingBox::new(min_x, min_y, max_x, max_y)
.map_err(|e| StreamingError::Core(e))
}
pub fn chunk_geotransform(&self, row: usize, col: usize) -> Result<GeoTransform> {
let gt = self.metadata.geotransform.as_ref()
.ok_or_else(|| StreamingError::InvalidState("No geotransform available".to_string()))?;
let chunk_width = self.config.chunk_size.0;
let chunk_height = self.config.chunk_size.1;
let overlap = self.config.overlap;
let effective_width = chunk_width - overlap;
let effective_height = chunk_height - overlap;
let x_start = col * effective_width;
let y_start = row * effective_height;
let origin_x = gt.origin_x + (x_start as f64) * gt.pixel_width;
let origin_y = gt.origin_y + (y_start as f64) * gt.pixel_height;
Ok(GeoTransform {
origin_x,
origin_y,
pixel_width: gt.pixel_width,
pixel_height: gt.pixel_height,
rotation_x: gt.rotation_x,
rotation_y: gt.rotation_y,
})
}
pub async fn memory_stats(&self) -> MemoryStats {
let current = *self.memory_usage.read().await;
MemoryStats {
current_bytes: current,
max_bytes: self.config.max_memory_bytes,
utilization: (current as f64) / (self.config.max_memory_bytes as f64),
}
}
async fn update_memory(&self, delta: isize) -> Result<()> {
let mut usage = self.memory_usage.write().await;
if delta > 0 {
let new_usage = *usage + delta as usize;
if new_usage > self.config.max_memory_bytes {
return Err(StreamingError::Other(
"Memory limit exceeded".to_string()
));
}
*usage = new_usage;
} else {
*usage = usage.saturating_sub((-delta) as usize);
}
Ok(())
}
}
#[async_trait]
impl RasterStreaming for RasterStream {
async fn next_chunk(&mut self) -> Result<Option<RasterChunk>> {
match self.receiver.recv().await {
Some(chunk) => {
let mut pos = self.current_position.write().await;
pos.1 += 1;
if pos.1 >= self.total_chunks.1 {
pos.1 = 0;
pos.0 += 1;
}
Ok(Some(chunk))
}
None => Ok(None),
}
}
async fn next_chunks(&mut self, count: usize) -> Result<Vec<RasterChunk>> {
let mut chunks = Vec::with_capacity(count);
for _ in 0..count {
match self.next_chunk().await? {
Some(chunk) => chunks.push(chunk),
None => break,
}
}
Ok(chunks)
}
async fn seek_to_chunk(&mut self, row: usize, col: usize) -> Result<()> {
if row >= self.total_chunks.0 || col >= self.total_chunks.1 {
return Err(StreamingError::InvalidOperation(
format!("Chunk ({}, {}) out of bounds", row, col)
));
}
let mut pos = self.current_position.write().await;
*pos = (row, col);
if let Some(sender) = &self.prefetch_sender {
if let Err(_) = sender.try_send((row, col)) {
warn!("Failed to send prefetch request");
}
}
Ok(())
}
fn total_chunks(&self) -> (usize, usize) {
self.total_chunks
}
fn current_position(&self) -> (usize, usize) {
match self.current_position.try_read() {
Ok(pos) => *pos,
Err(_) => (0, 0),
}
}
fn has_more_chunks(&self) -> bool {
let pos = self.current_position();
pos.0 < self.total_chunks.0
}
}
#[derive(Debug, Clone)]
pub struct MemoryStats {
pub current_bytes: usize,
pub max_bytes: usize,
pub utilization: f64,
}
#[derive(Debug, Clone, Default)]
pub struct ChunkStats {
pub chunks_processed: usize,
pub chunks_failed: usize,
pub bytes_processed: usize,
pub processing_time_ms: u64,
pub avg_chunk_time_ms: f64,
}
impl ChunkStats {
pub fn new() -> Self {
Self::default()
}
pub fn record_chunk(&mut self, size_bytes: usize, time_ms: u64) {
self.chunks_processed += 1;
self.bytes_processed += size_bytes;
self.processing_time_ms += time_ms;
self.avg_chunk_time_ms = (self.processing_time_ms as f64) / (self.chunks_processed as f64);
}
pub fn record_failure(&mut self) {
self.chunks_failed += 1;
}
pub fn throughput_mbps(&self) -> f64 {
if self.processing_time_ms == 0 {
return 0.0;
}
let mb = (self.bytes_processed as f64) / (1024.0 * 1024.0);
let seconds = (self.processing_time_ms as f64) / 1000.0;
mb / seconds
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_chunk_calculation() {
let chunks = RasterStream::calculate_chunks(1024, 1024, 256, 256, 0);
assert_eq!(chunks, (4, 4));
let chunks = RasterStream::calculate_chunks(1000, 1000, 256, 256, 0);
assert_eq!(chunks, (4, 4));
let chunks = RasterStream::calculate_chunks(1024, 1024, 256, 256, 16);
assert_eq!(chunks, (5, 5));
}
#[test]
fn test_config_builder() {
let config = RasterStreamConfig::new()
.with_chunk_size(1024, 1024)
.with_overlap(32)
.with_compression(9)
.with_prefetch(4);
assert_eq!(config.chunk_size, (1024, 1024));
assert_eq!(config.overlap, 32);
assert_eq!(config.compression, true);
assert_eq!(config.compression_level, 9);
assert_eq!(config.prefetch_count, 4);
}
#[test]
fn test_chunk_stats() {
let mut stats = ChunkStats::new();
stats.record_chunk(1024 * 1024, 100);
stats.record_chunk(1024 * 1024, 150);
assert_eq!(stats.chunks_processed, 2);
assert_eq!(stats.bytes_processed, 2 * 1024 * 1024);
assert_eq!(stats.avg_chunk_time_ms, 125.0);
assert!(stats.throughput_mbps() > 0.0);
}
}