use serde::{Deserialize, Serialize};
use std::collections::{HashMap, VecDeque};
use wasm_bindgen::prelude::*;
use web_sys::{Worker, WorkerOptions, WorkerType};
use crate::error::{WasmError, WasmResult, WorkerError};
use crate::tile::TileCoord;
#[allow(dead_code)]
pub const DEFAULT_WORKER_POOL_SIZE: usize = 4;
pub const MAX_PENDING_JOBS_PER_WORKER: usize = 10;
#[allow(dead_code)]
pub const DEFAULT_JOB_TIMEOUT_MS: u64 = 30000;
pub type JobId = u64;
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerJobRequest {
pub job_id: JobId,
pub request_type: WorkerRequestType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WorkerRequestType {
LoadTile {
url: String,
coord: TileCoord,
},
LoadTiles {
url: String,
coords: Vec<TileCoord>,
},
Prefetch {
url: String,
coords: Vec<TileCoord>,
},
GetMetadata {
url: String,
},
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct WorkerJobResponse {
pub job_id: JobId,
pub response_type: WorkerResponseType,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "type")]
pub enum WorkerResponseType {
TileLoaded {
coord: TileCoord,
data: Vec<u8>,
},
TilesLoaded {
tiles: Vec<(TileCoord, Vec<u8>)>,
},
PrefetchCompleted {
count: usize,
},
Metadata {
metadata: String,
},
Error {
message: String,
},
}
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum JobStatus {
Pending,
InProgress,
Completed,
Failed,
TimedOut,
}
#[derive(Debug, Clone)]
pub struct PendingJob {
pub job_id: JobId,
pub worker_id: Option<u32>,
pub request: WorkerJobRequest,
pub status: JobStatus,
pub submitted_at: f64,
pub started_at: Option<f64>,
pub completed_at: Option<f64>,
pub timeout_ms: u64,
}
impl PendingJob {
pub fn new(job_id: JobId, request: WorkerJobRequest, timestamp: f64, timeout_ms: u64) -> Self {
Self {
job_id,
worker_id: None,
request,
status: JobStatus::Pending,
submitted_at: timestamp,
started_at: None,
completed_at: None,
timeout_ms,
}
}
pub fn is_timed_out(&self, current_time: f64) -> bool {
if let Some(started) = self.started_at {
(current_time - started) * 1000.0 > self.timeout_ms as f64
} else {
false
}
}
pub fn elapsed_ms(&self, current_time: f64) -> f64 {
if let Some(started) = self.started_at {
(current_time - started) * 1000.0
} else {
0.0
}
}
}
#[derive(Debug)]
pub struct WorkerInfo {
pub id: u32,
pub worker: Worker,
pub current_job: Option<JobId>,
pub completed_jobs: u64,
pub failed_jobs: u64,
pub total_processing_time_ms: f64,
}
impl WorkerInfo {
pub fn new(id: u32, worker: Worker) -> Self {
Self {
id,
worker,
current_job: None,
completed_jobs: 0,
failed_jobs: 0,
total_processing_time_ms: 0.0,
}
}
pub const fn is_idle(&self) -> bool {
self.current_job.is_none()
}
pub fn average_processing_time_ms(&self) -> f64 {
let total_jobs = self.completed_jobs + self.failed_jobs;
if total_jobs == 0 {
0.0
} else {
self.total_processing_time_ms / total_jobs as f64
}
}
}
pub struct WorkerPool {
workers: Vec<WorkerInfo>,
job_queue: VecDeque<JobId>,
pending_jobs: HashMap<JobId, PendingJob>,
next_job_id: JobId,
pool_size: usize,
max_pending_jobs: usize,
}
impl WorkerPool {
pub fn new(pool_size: usize) -> WasmResult<Self> {
let mut workers = Vec::with_capacity(pool_size);
for i in 0..pool_size {
let worker = Self::create_worker(i as u32)?;
workers.push(WorkerInfo::new(i as u32, worker));
}
Ok(Self {
workers,
job_queue: VecDeque::new(),
pending_jobs: HashMap::new(),
next_job_id: 0,
pool_size,
max_pending_jobs: pool_size * MAX_PENDING_JOBS_PER_WORKER,
})
}
fn create_worker(_id: u32) -> WasmResult<Worker> {
let options = WorkerOptions::new();
options.set_type(WorkerType::Module);
let worker = Worker::new_with_options("./cog-worker.js", &options).map_err(|e| {
WasmError::Worker(WorkerError::CreationFailed {
message: format!("Failed to create worker: {e:?}"),
})
})?;
Ok(worker)
}
pub fn submit_job(
&mut self,
request: WorkerJobRequest,
timestamp: f64,
timeout_ms: u64,
) -> WasmResult<JobId> {
if self.pending_jobs.len() >= self.max_pending_jobs {
return Err(WasmError::Worker(WorkerError::PoolExhausted {
pool_size: self.pool_size,
pending_jobs: self.pending_jobs.len(),
}));
}
let job_id = self.next_job_id;
self.next_job_id += 1;
let job = PendingJob::new(job_id, request, timestamp, timeout_ms);
self.pending_jobs.insert(job_id, job);
self.job_queue.push_back(job_id);
self.dispatch_jobs(timestamp)?;
Ok(job_id)
}
fn dispatch_jobs(&mut self, timestamp: f64) -> WasmResult<()> {
while let Some(job_id) = self.job_queue.pop_front() {
let worker_idx = self.workers.iter().position(|w| w.is_idle());
if let Some(idx) = worker_idx {
self.workers[idx].current_job = Some(job_id);
if let Some(job) = self.pending_jobs.get_mut(&job_id) {
job.worker_id = Some(self.workers[idx].id);
job.status = JobStatus::InProgress;
job.started_at = Some(timestamp);
let message = serde_json::to_string(&job.request).map_err(|e| {
WasmError::Worker(WorkerError::PostMessageFailed {
worker_id: self.workers[idx].id,
message: e.to_string(),
})
})?;
self.workers[idx]
.worker
.post_message(&JsValue::from_str(&message))
.map_err(|e| {
WasmError::Worker(WorkerError::PostMessageFailed {
worker_id: self.workers[idx].id,
message: format!("{e:?}"),
})
})?;
}
} else {
self.job_queue.push_front(job_id);
break;
}
}
Ok(())
}
pub fn handle_response(
&mut self,
worker_id: u32,
response: WorkerJobResponse,
timestamp: f64,
) -> WasmResult<()> {
let job_id = response.job_id;
let worker_idx = self
.workers
.iter()
.position(|w| w.id == worker_id)
.ok_or_else(|| {
WasmError::Worker(WorkerError::InvalidResponse {
expected: format!("worker {worker_id}"),
actual: "unknown worker".to_string(),
})
})?;
if let Some(job) = self.pending_jobs.get_mut(&job_id) {
let elapsed = if let Some(started) = job.started_at {
(timestamp - started) * 1000.0
} else {
0.0
};
match &response.response_type {
WorkerResponseType::Error { .. } => {
job.status = JobStatus::Failed;
self.workers[worker_idx].failed_jobs += 1;
}
_ => {
job.status = JobStatus::Completed;
self.workers[worker_idx].completed_jobs += 1;
}
}
job.completed_at = Some(timestamp);
self.workers[worker_idx].total_processing_time_ms += elapsed;
}
self.workers[worker_idx].current_job = None;
self.dispatch_jobs(timestamp)?;
Ok(())
}
pub fn check_timeouts(&mut self, current_time: f64) -> Vec<JobId> {
let mut timed_out = Vec::new();
for (job_id, job) in &mut self.pending_jobs {
if job.status == JobStatus::InProgress && job.is_timed_out(current_time) {
job.status = JobStatus::TimedOut;
timed_out.push(*job_id);
if let Some(worker_id) = job.worker_id {
if let Some(worker) = self.workers.iter_mut().find(|w| w.id == worker_id) {
worker.current_job = None;
worker.failed_jobs += 1;
}
}
}
}
timed_out
}
pub fn job_status(&self, job_id: JobId) -> Option<JobStatus> {
self.pending_jobs.get(&job_id).map(|j| j.status)
}
pub fn cancel_job(&mut self, job_id: JobId) -> WasmResult<()> {
if let Some(job) = self.pending_jobs.get_mut(&job_id) {
job.status = JobStatus::Failed;
if let Some(pos) = self.job_queue.iter().position(|&id| id == job_id) {
self.job_queue.remove(pos);
}
}
Ok(())
}
pub fn stats(&self) -> PoolStats {
let idle_workers = self.workers.iter().filter(|w| w.is_idle()).count();
let total_completed: u64 = self.workers.iter().map(|w| w.completed_jobs).sum();
let total_failed: u64 = self.workers.iter().map(|w| w.failed_jobs).sum();
PoolStats {
pool_size: self.pool_size,
idle_workers,
pending_jobs: self.job_queue.len(),
total_jobs: self.pending_jobs.len(),
completed_jobs: total_completed,
failed_jobs: total_failed,
}
}
pub fn cleanup_jobs(&mut self) {
self.pending_jobs.retain(|_, job| {
job.status == JobStatus::Pending || job.status == JobStatus::InProgress
});
}
pub fn shutdown(&mut self) {
for worker in &self.workers {
worker.worker.terminate();
}
self.workers.clear();
self.job_queue.clear();
self.pending_jobs.clear();
}
}
#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
pub struct PoolStats {
pub pool_size: usize,
pub idle_workers: usize,
pub pending_jobs: usize,
pub total_jobs: usize,
pub completed_jobs: u64,
pub failed_jobs: u64,
}
impl PoolStats {
pub fn utilization(&self) -> f64 {
if self.pool_size == 0 {
0.0
} else {
(self.pool_size - self.idle_workers) as f64 / self.pool_size as f64
}
}
pub fn success_rate(&self) -> f64 {
let total = self.completed_jobs + self.failed_jobs;
if total == 0 {
1.0
} else {
self.completed_jobs as f64 / total as f64
}
}
}
#[allow(dead_code)]
pub struct TileLoadCoordinator {
pool: WorkerPool,
callbacks: HashMap<JobId, Box<dyn FnOnce(Result<WorkerJobResponse, WasmError>)>>,
}
#[allow(dead_code)]
impl TileLoadCoordinator {
pub fn new(pool_size: usize) -> WasmResult<Self> {
let pool = WorkerPool::new(pool_size)?;
Ok(Self {
pool,
callbacks: HashMap::new(),
})
}
pub fn load_tile<F>(
&mut self,
url: String,
coord: TileCoord,
timestamp: f64,
callback: F,
) -> WasmResult<JobId>
where
F: FnOnce(Result<Vec<u8>, WasmError>) + 'static,
{
let request = WorkerJobRequest {
job_id: 0, request_type: WorkerRequestType::LoadTile { url, coord },
};
let job_id = self
.pool
.submit_job(request, timestamp, DEFAULT_JOB_TIMEOUT_MS)?;
let wrapped = Box::new(
move |result: Result<WorkerJobResponse, WasmError>| match result {
Ok(response) => match response.response_type {
WorkerResponseType::TileLoaded { data, .. } => callback(Ok(data)),
WorkerResponseType::Error { message } => {
callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
expected: "TileLoaded".to_string(),
actual: message,
})))
}
_ => callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
expected: "TileLoaded".to_string(),
actual: format!("{:?}", response.response_type),
}))),
},
Err(e) => callback(Err(e)),
},
);
self.callbacks.insert(job_id, wrapped);
Ok(job_id)
}
pub fn load_tiles<F>(
&mut self,
url: String,
coords: Vec<TileCoord>,
timestamp: f64,
callback: F,
) -> WasmResult<JobId>
where
F: FnOnce(Result<Vec<(TileCoord, Vec<u8>)>, WasmError>) + 'static,
{
let request = WorkerJobRequest {
job_id: 0,
request_type: WorkerRequestType::LoadTiles { url, coords },
};
let job_id = self
.pool
.submit_job(request, timestamp, DEFAULT_JOB_TIMEOUT_MS)?;
let wrapped = Box::new(
move |result: Result<WorkerJobResponse, WasmError>| match result {
Ok(response) => match response.response_type {
WorkerResponseType::TilesLoaded { tiles } => callback(Ok(tiles)),
WorkerResponseType::Error { message } => {
callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
expected: "TilesLoaded".to_string(),
actual: message,
})))
}
_ => callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
expected: "TilesLoaded".to_string(),
actual: format!("{:?}", response.response_type),
}))),
},
Err(e) => callback(Err(e)),
},
);
self.callbacks.insert(job_id, wrapped);
Ok(job_id)
}
pub fn stats(&self) -> PoolStats {
self.pool.stats()
}
}
#[wasm_bindgen]
pub struct WasmWorkerPool {
pool_size: usize,
}
#[wasm_bindgen]
impl WasmWorkerPool {
#[wasm_bindgen(constructor)]
pub fn new(pool_size: usize) -> Self {
Self { pool_size }
}
#[wasm_bindgen(js_name = poolSize)]
pub fn pool_size(&self) -> usize {
self.pool_size
}
#[wasm_bindgen(js_name = getInfo)]
pub fn get_info(&self) -> String {
format!(
"Worker pool configured with {} workers. Worker creation requires a separate worker script.",
self.pool_size
)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_pending_job() {
let request = WorkerJobRequest {
job_id: 1,
request_type: WorkerRequestType::GetMetadata {
url: "test.tif".to_string(),
},
};
let job = PendingJob::new(1, request, 0.0, 1000);
assert_eq!(job.job_id, 1);
assert_eq!(job.status, JobStatus::Pending);
}
#[test]
fn test_job_timeout() {
let request = WorkerJobRequest {
job_id: 1,
request_type: WorkerRequestType::GetMetadata {
url: "test.tif".to_string(),
},
};
let mut job = PendingJob::new(1, request, 0.0, 1000);
job.started_at = Some(0.0);
assert!(!job.is_timed_out(0.5)); assert!(job.is_timed_out(2.0)); }
#[test]
fn test_pool_stats() {
let stats = PoolStats {
pool_size: 4,
idle_workers: 2,
pending_jobs: 5,
total_jobs: 10,
completed_jobs: 100,
failed_jobs: 10,
};
assert_eq!(stats.utilization(), 0.5); assert!((stats.success_rate() - 0.909).abs() < 0.01); }
#[test]
fn test_job_request_serialization() {
let request = WorkerJobRequest {
job_id: 42,
request_type: WorkerRequestType::LoadTile {
url: "test.tif".to_string(),
coord: TileCoord::new(5, 10, 20),
},
};
let json = serde_json::to_string(&request).expect("Serialization failed");
let parsed: WorkerJobRequest = serde_json::from_str(&json).expect("Deserialization failed");
assert_eq!(parsed.job_id, 42);
}
#[test]
fn test_wasm_worker_pool() {
let pool = WasmWorkerPool::new(4);
assert_eq!(pool.pool_size(), 4);
let info = pool.get_info();
assert!(info.contains("4 workers"));
}
}