Skip to main content

oxigdal_wasm/
worker.rs

1//! Web Worker support for parallel tile loading
2//!
3//! This module provides Web Worker communication, parallel tile fetching,
4//! job queue management, worker pool coordination, and progress tracking for
5//! high-performance geospatial data visualization in the browser.
6//!
7//! # Overview
8//!
9//! The worker module implements a sophisticated worker pool system for parallel
10//! tile loading:
11//!
12//! - **Worker Pool**: Manages multiple Web Workers for concurrent tile loading
13//! - **Job Queue**: FIFO queue with support for job prioritization
14//! - **Load Balancing**: Distributes work evenly across available workers
15//! - **Timeout Handling**: Detects and recovers from stuck workers
16//! - **Progress Tracking**: Monitors tile loading progress for UI updates
17//! - **Error Recovery**: Handles worker failures gracefully
18//!
19//! # Architecture
20//!
21//! ```text
22//! ┌─────────────────┐
23//! │   Main Thread   │
24//! │  (Rust/WASM)    │
25//! └────────┬────────┘
26//!          │
27//!          │ Job Queue
28//!          │
29//! ┌────────▼────────────────────────────────┐
30//! │         Worker Pool Manager             │
31//! │  - Job scheduling                       │
32//! │  - Worker health monitoring             │
33//! │  - Load balancing                       │
34//! └───┬─────────┬─────────┬────────┬────────┘
35//!     │         │         │        │
36//! ┌───▼───┐ ┌──▼───┐ ┌───▼──┐ ┌───▼───┐
37//! │Worker │ │Worker│ │Worker│ │Worker │
38//! │  #1   │ │  #2  │ │  #3  │ │  #4   │
39//! └───────┘ └──────┘ └──────┘ └───────┘
40//!     │         │         │        │
41//!     └─────────┴─────────┴────────┘
42//!              Network
43//! ```
44//!
45//! # Web Worker Basics
46//!
47//! Web Workers run JavaScript/WASM code in separate threads, allowing:
48//! - True parallel execution (not just concurrent)
49//! - No blocking of main UI thread
50//! - Independent memory space per worker
51//! - Communication via postMessage
52//!
53//! Limitations:
54//! - No direct DOM access
55//! - Message passing overhead (serialization)
56//! - Limited to ~4-8 workers per page (browser dependent)
57//!
58//! # Job Lifecycle
59//!
60//! ```text
61//! Submit ─> Pending ─> In Progress ─> Completed
62//!                          │
63//!                          └────────> Failed/Timed Out
64//! ```
65//!
66//! 1. **Submit**: Job is added to the queue
67//! 2. **Pending**: Job waits for available worker
68//! 3. **In Progress**: Worker is processing the job
69//! 4. **Completed**: Job finished successfully
70//! 5. **Failed**: Job encountered an error
71//! 6. **Timed Out**: Job exceeded time limit
72//!
73//! # Performance Characteristics
74//!
75//! Overhead costs:
76//! - Worker creation: ~50-100ms per worker
77//! - Message passing: ~0.1-1ms per message
78//! - Job dispatch: ~0.1ms
79//! - Serialization: ~0.5ms per 256KB tile
80//!
81//! Optimal pool size:
82//! - CPU-bound work: Number of cores (typically 4-8)
83//! - Network-bound work: 2-4 workers
84//! - Mixed workload: 3-6 workers
85//!
86//! Throughput example (4 workers, good network):
87//! - Sequential: 4 tiles/sec
88//! - Parallel: 12-16 tiles/sec (3-4x speedup)
89//!
90//! # Example: Basic Worker Pool
91//!
92//! ```ignore
93//! use oxigdal_wasm::worker::{WorkerPool, WorkerJobRequest, WorkerRequestType};
94//! use oxigdal_wasm::tile::TileCoord;
95//!
96//! // Create pool with 4 workers
97//! let mut pool = WorkerPool::new(4).expect("Failed to create pool");
98//!
99//! // Submit a tile loading job
100//! let request = WorkerJobRequest {
101//!     job_id: 0,
102//!     request_type: WorkerRequestType::LoadTile {
103//!         url: "https://example.com/image.tif".to_string(),
104//!         coord: TileCoord::new(0, 0, 0),
105//!     },
106//! };
107//!
108//! let job_id = pool.submit_job(request, 0.0, 30000)
109//!     .expect("Failed to submit job");
110//!
111//! println!("Job {} submitted", job_id);
112//!
113//! // Check pool statistics
114//! let stats = pool.stats();
115//! println!("Pool utilization: {:.1}%", stats.utilization() * 100.0);
116//! ```
117//!
118//! # Example: Job Prioritization
119//!
120//! ```ignore
121//! use oxigdal_wasm::worker::{WorkerPool, WorkerJobRequest, WorkerRequestType};
122//! use oxigdal_wasm::tile::TileCoord;
123//!
124//! let mut pool = WorkerPool::new(4).expect("Create failed");
125//!
126//! // Submit high-priority jobs (visible tiles)
127//! for coord in visible_tiles {
128//!     let request = WorkerJobRequest {
129//!         job_id: 0,
130//!         request_type: WorkerRequestType::LoadTile {
131//!             url: url.clone(),
132//!             coord,
133//!         },
134//!     };
135//!     pool.submit_job(request, timestamp, 10000)?;
136//! }
137//!
138//! // Submit low-priority jobs (prefetch)
139//! for coord in prefetch_tiles {
140//!     let request = WorkerJobRequest {
141//!         job_id: 0,
142//!         request_type: WorkerRequestType::Prefetch {
143//!             url: url.clone(),
144//!             coords: vec![coord],
145//!         },
146//!     };
147//!     pool.submit_job(request, timestamp, 60000)?;
148//! }
149//! ```
150//!
151//! # Example: Monitoring and Health Checks
152//!
153//! ```ignore
154//! use oxigdal_wasm::worker::WorkerPool;
155//!
156//! let mut pool = WorkerPool::new(4).expect("Create failed");
157//!
158//! // Periodically check for timeouts
159//! loop {
160//!     let current_time = js_sys::Date::now();
161//!     let timed_out = pool.check_timeouts(current_time);
162//!
163//!     for job_id in timed_out {
164//!         println!("Job {} timed out!", job_id);
165//!         // Resubmit or report error
166//!     }
167//!
168//!     // Check pool health
169//!     let stats = pool.stats();
170//!     if stats.idle_workers == 0 && stats.pending_jobs > 10 {
171//!         println!("Warning: Pool is saturated!");
172//!         // Consider reducing load or increasing pool size
173//!     }
174//!
175//!     // Wait before next check
176//!     await sleep(1000);
177//! }
178//! ```
179//!
180//! # Best Practices
181//!
182//! ## Pool Sizing
183//! - Start with 3-4 workers for most applications
184//! - Increase to 6-8 for high-bandwidth connections
185//! - Decrease to 2-3 for mobile devices
186//! - Monitor CPU usage and adjust accordingly
187//!
188//! ## Job Management
189//! - Set appropriate timeouts (10-30 seconds typical)
190//! - Clean up completed jobs periodically
191//! - Prioritize visible content over prefetch
192//! - Batch small jobs to reduce overhead
193//!
194//! ## Error Handling
195//! - Always handle job failures
196//! - Implement retry logic for transient errors
197//! - Report persistent failures to user
198//! - Gracefully degrade on worker unavailability
199//!
200//! ## Memory Management
201//! - Limit concurrent jobs to prevent memory pressure
202//! - Transfer large ArrayBuffers (use Transferable)
203//! - Clean up job results after processing
204//! - Monitor overall memory usage
205//!
206//! # Troubleshooting
207//!
208//! ## Workers not starting
209//! - Check CORS headers on worker script
210//! - Verify worker script URL is correct
211//! - Check browser console for errors
212//! - Ensure WASM module is loaded in worker
213//!
214//! ## Poor performance
215//! - Reduce number of workers (might be too many)
216//! - Check network latency (workers waiting on network)
217//! - Profile worker code for bottlenecks
218//! - Verify efficient message passing (avoid large copies)
219//!
220//! ## Jobs timing out
221//! - Increase timeout duration
222//! - Check network reliability
223//! - Verify server response times
224//! - Implement retry logic for failed requests
225
226use serde::{Deserialize, Serialize};
227use std::collections::{HashMap, VecDeque};
228use wasm_bindgen::prelude::*;
229use web_sys::{Worker, WorkerOptions, WorkerType};
230
231use crate::error::{WasmError, WasmResult, WorkerError};
232use crate::tile::TileCoord;
233
234/// Maximum number of workers in the pool
235#[allow(dead_code)]
236pub const DEFAULT_WORKER_POOL_SIZE: usize = 4;
237
238/// Maximum number of pending jobs per worker
239pub const MAX_PENDING_JOBS_PER_WORKER: usize = 10;
240
241/// Job timeout in milliseconds
242#[allow(dead_code)]
243pub const DEFAULT_JOB_TIMEOUT_MS: u64 = 30000;
244
245/// Unique job identifier
246pub type JobId = u64;
247
248/// Worker job request
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct WorkerJobRequest {
251    /// Job ID
252    pub job_id: JobId,
253    /// Request type
254    pub request_type: WorkerRequestType,
255}
256
257/// Worker request types
258#[derive(Debug, Clone, Serialize, Deserialize)]
259#[serde(tag = "type")]
260pub enum WorkerRequestType {
261    /// Load a tile
262    LoadTile {
263        /// URL of the COG
264        url: String,
265        /// Tile coordinate
266        coord: TileCoord,
267    },
268    /// Load multiple tiles
269    LoadTiles {
270        /// URL of the COG
271        url: String,
272        /// Tile coordinates
273        coords: Vec<TileCoord>,
274    },
275    /// Prefetch tiles
276    Prefetch {
277        /// URL of the COG
278        url: String,
279        /// Tile coordinates
280        coords: Vec<TileCoord>,
281    },
282    /// Get metadata
283    GetMetadata {
284        /// URL of the COG
285        url: String,
286    },
287}
288
289/// Worker job response
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct WorkerJobResponse {
292    /// Job ID
293    pub job_id: JobId,
294    /// Response type
295    pub response_type: WorkerResponseType,
296}
297
298/// Worker response types
299#[derive(Debug, Clone, Serialize, Deserialize)]
300#[serde(tag = "type")]
301pub enum WorkerResponseType {
302    /// Tile loaded successfully
303    TileLoaded {
304        /// Tile coordinate
305        coord: TileCoord,
306        /// Tile data
307        data: Vec<u8>,
308    },
309    /// Multiple tiles loaded
310    TilesLoaded {
311        /// Tiles data
312        tiles: Vec<(TileCoord, Vec<u8>)>,
313    },
314    /// Prefetch completed
315    PrefetchCompleted {
316        /// Number of tiles prefetched
317        count: usize,
318    },
319    /// Metadata retrieved
320    Metadata {
321        /// Metadata JSON
322        metadata: String,
323    },
324    /// Error occurred
325    Error {
326        /// Error message
327        message: String,
328    },
329}
330
331/// Job status
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub enum JobStatus {
334    /// Job is pending
335    Pending,
336    /// Job is in progress
337    InProgress,
338    /// Job completed successfully
339    Completed,
340    /// Job failed
341    Failed,
342    /// Job timed out
343    TimedOut,
344}
345
346/// Pending job information
347#[derive(Debug, Clone)]
348pub struct PendingJob {
349    /// Job ID
350    pub job_id: JobId,
351    /// Worker ID (if assigned)
352    pub worker_id: Option<u32>,
353    /// Job request
354    pub request: WorkerJobRequest,
355    /// Job status
356    pub status: JobStatus,
357    /// Submission timestamp
358    pub submitted_at: f64,
359    /// Start timestamp (when worker started processing)
360    pub started_at: Option<f64>,
361    /// Completion timestamp
362    pub completed_at: Option<f64>,
363    /// Timeout duration in milliseconds
364    pub timeout_ms: u64,
365}
366
367impl PendingJob {
368    /// Creates a new pending job
369    pub fn new(job_id: JobId, request: WorkerJobRequest, timestamp: f64, timeout_ms: u64) -> Self {
370        Self {
371            job_id,
372            worker_id: None,
373            request,
374            status: JobStatus::Pending,
375            submitted_at: timestamp,
376            started_at: None,
377            completed_at: None,
378            timeout_ms,
379        }
380    }
381
382    /// Checks if the job has timed out
383    pub fn is_timed_out(&self, current_time: f64) -> bool {
384        if let Some(started) = self.started_at {
385            (current_time - started) * 1000.0 > self.timeout_ms as f64
386        } else {
387            false
388        }
389    }
390
391    /// Returns the elapsed time in milliseconds
392    pub fn elapsed_ms(&self, current_time: f64) -> f64 {
393        if let Some(started) = self.started_at {
394            (current_time - started) * 1000.0
395        } else {
396            0.0
397        }
398    }
399}
400
401/// Worker information
402#[derive(Debug)]
403pub struct WorkerInfo {
404    /// Worker ID
405    pub id: u32,
406    /// Worker instance
407    pub worker: Worker,
408    /// Current job (if any)
409    pub current_job: Option<JobId>,
410    /// Number of completed jobs
411    pub completed_jobs: u64,
412    /// Number of failed jobs
413    pub failed_jobs: u64,
414    /// Total processing time in milliseconds
415    pub total_processing_time_ms: f64,
416}
417
418impl WorkerInfo {
419    /// Creates a new worker info
420    pub fn new(id: u32, worker: Worker) -> Self {
421        Self {
422            id,
423            worker,
424            current_job: None,
425            completed_jobs: 0,
426            failed_jobs: 0,
427            total_processing_time_ms: 0.0,
428        }
429    }
430
431    /// Checks if the worker is idle
432    pub const fn is_idle(&self) -> bool {
433        self.current_job.is_none()
434    }
435
436    /// Returns the average processing time per job
437    pub fn average_processing_time_ms(&self) -> f64 {
438        let total_jobs = self.completed_jobs + self.failed_jobs;
439        if total_jobs == 0 {
440            0.0
441        } else {
442            self.total_processing_time_ms / total_jobs as f64
443        }
444    }
445}
446
447/// Worker pool for parallel tile loading
448pub struct WorkerPool {
449    /// Workers in the pool
450    workers: Vec<WorkerInfo>,
451    /// Job queue
452    job_queue: VecDeque<JobId>,
453    /// Pending jobs
454    pending_jobs: HashMap<JobId, PendingJob>,
455    /// Next job ID
456    next_job_id: JobId,
457    /// Pool size
458    pool_size: usize,
459    /// Maximum pending jobs
460    max_pending_jobs: usize,
461}
462
463impl WorkerPool {
464    /// Creates a new worker pool
465    pub fn new(pool_size: usize) -> WasmResult<Self> {
466        let mut workers = Vec::with_capacity(pool_size);
467
468        for i in 0..pool_size {
469            let worker = Self::create_worker(i as u32)?;
470            workers.push(WorkerInfo::new(i as u32, worker));
471        }
472
473        Ok(Self {
474            workers,
475            job_queue: VecDeque::new(),
476            pending_jobs: HashMap::new(),
477            next_job_id: 0,
478            pool_size,
479            max_pending_jobs: pool_size * MAX_PENDING_JOBS_PER_WORKER,
480        })
481    }
482
483    /// Creates a new worker
484    fn create_worker(_id: u32) -> WasmResult<Worker> {
485        let options = WorkerOptions::new();
486        options.set_type(WorkerType::Module);
487
488        // Create worker from script URL
489        // The worker script should be served alongside the main application
490        let worker = Worker::new_with_options("./cog-worker.js", &options).map_err(|e| {
491            WasmError::Worker(WorkerError::CreationFailed {
492                message: format!("Failed to create worker: {e:?}"),
493            })
494        })?;
495
496        Ok(worker)
497    }
498
499    /// Submits a job to the pool
500    pub fn submit_job(
501        &mut self,
502        request: WorkerJobRequest,
503        timestamp: f64,
504        timeout_ms: u64,
505    ) -> WasmResult<JobId> {
506        if self.pending_jobs.len() >= self.max_pending_jobs {
507            return Err(WasmError::Worker(WorkerError::PoolExhausted {
508                pool_size: self.pool_size,
509                pending_jobs: self.pending_jobs.len(),
510            }));
511        }
512
513        let job_id = self.next_job_id;
514        self.next_job_id += 1;
515
516        let job = PendingJob::new(job_id, request, timestamp, timeout_ms);
517        self.pending_jobs.insert(job_id, job);
518        self.job_queue.push_back(job_id);
519
520        self.dispatch_jobs(timestamp)?;
521
522        Ok(job_id)
523    }
524
525    /// Dispatches pending jobs to idle workers
526    fn dispatch_jobs(&mut self, timestamp: f64) -> WasmResult<()> {
527        while let Some(job_id) = self.job_queue.pop_front() {
528            // Find an idle worker
529            let worker_idx = self.workers.iter().position(|w| w.is_idle());
530
531            if let Some(idx) = worker_idx {
532                // Assign job to worker
533                self.workers[idx].current_job = Some(job_id);
534
535                if let Some(job) = self.pending_jobs.get_mut(&job_id) {
536                    job.worker_id = Some(self.workers[idx].id);
537                    job.status = JobStatus::InProgress;
538                    job.started_at = Some(timestamp);
539
540                    // Post message to worker
541                    let message = serde_json::to_string(&job.request).map_err(|e| {
542                        WasmError::Worker(WorkerError::PostMessageFailed {
543                            worker_id: self.workers[idx].id,
544                            message: e.to_string(),
545                        })
546                    })?;
547
548                    self.workers[idx]
549                        .worker
550                        .post_message(&JsValue::from_str(&message))
551                        .map_err(|e| {
552                            WasmError::Worker(WorkerError::PostMessageFailed {
553                                worker_id: self.workers[idx].id,
554                                message: format!("{e:?}"),
555                            })
556                        })?;
557                }
558            } else {
559                // No idle workers, put job back in queue
560                self.job_queue.push_front(job_id);
561                break;
562            }
563        }
564
565        Ok(())
566    }
567
568    /// Handles a worker response
569    pub fn handle_response(
570        &mut self,
571        worker_id: u32,
572        response: WorkerJobResponse,
573        timestamp: f64,
574    ) -> WasmResult<()> {
575        let job_id = response.job_id;
576
577        // Find the worker
578        let worker_idx = self
579            .workers
580            .iter()
581            .position(|w| w.id == worker_id)
582            .ok_or_else(|| {
583                WasmError::Worker(WorkerError::InvalidResponse {
584                    expected: format!("worker {worker_id}"),
585                    actual: "unknown worker".to_string(),
586                })
587            })?;
588
589        // Update job status
590        if let Some(job) = self.pending_jobs.get_mut(&job_id) {
591            let elapsed = if let Some(started) = job.started_at {
592                (timestamp - started) * 1000.0
593            } else {
594                0.0
595            };
596
597            match &response.response_type {
598                WorkerResponseType::Error { .. } => {
599                    job.status = JobStatus::Failed;
600                    self.workers[worker_idx].failed_jobs += 1;
601                }
602                _ => {
603                    job.status = JobStatus::Completed;
604                    self.workers[worker_idx].completed_jobs += 1;
605                }
606            }
607
608            job.completed_at = Some(timestamp);
609            self.workers[worker_idx].total_processing_time_ms += elapsed;
610        }
611
612        // Mark worker as idle
613        self.workers[worker_idx].current_job = None;
614
615        // Dispatch next job
616        self.dispatch_jobs(timestamp)?;
617
618        Ok(())
619    }
620
621    /// Checks for timed out jobs
622    pub fn check_timeouts(&mut self, current_time: f64) -> Vec<JobId> {
623        let mut timed_out = Vec::new();
624
625        for (job_id, job) in &mut self.pending_jobs {
626            if job.status == JobStatus::InProgress && job.is_timed_out(current_time) {
627                job.status = JobStatus::TimedOut;
628                timed_out.push(*job_id);
629
630                // Mark worker as idle
631                if let Some(worker_id) = job.worker_id {
632                    if let Some(worker) = self.workers.iter_mut().find(|w| w.id == worker_id) {
633                        worker.current_job = None;
634                        worker.failed_jobs += 1;
635                    }
636                }
637            }
638        }
639
640        timed_out
641    }
642
643    /// Returns the job status
644    pub fn job_status(&self, job_id: JobId) -> Option<JobStatus> {
645        self.pending_jobs.get(&job_id).map(|j| j.status)
646    }
647
648    /// Cancels a job
649    pub fn cancel_job(&mut self, job_id: JobId) -> WasmResult<()> {
650        if let Some(job) = self.pending_jobs.get_mut(&job_id) {
651            job.status = JobStatus::Failed;
652
653            // Remove from queue if pending
654            if let Some(pos) = self.job_queue.iter().position(|&id| id == job_id) {
655                self.job_queue.remove(pos);
656            }
657
658            // If job is in progress, we can't easily stop the worker
659            // Just mark it as failed and the worker will be released when it responds
660        }
661
662        Ok(())
663    }
664
665    /// Returns pool statistics
666    pub fn stats(&self) -> PoolStats {
667        let idle_workers = self.workers.iter().filter(|w| w.is_idle()).count();
668        let total_completed: u64 = self.workers.iter().map(|w| w.completed_jobs).sum();
669        let total_failed: u64 = self.workers.iter().map(|w| w.failed_jobs).sum();
670
671        PoolStats {
672            pool_size: self.pool_size,
673            idle_workers,
674            pending_jobs: self.job_queue.len(),
675            total_jobs: self.pending_jobs.len(),
676            completed_jobs: total_completed,
677            failed_jobs: total_failed,
678        }
679    }
680
681    /// Clears completed and failed jobs
682    pub fn cleanup_jobs(&mut self) {
683        self.pending_jobs.retain(|_, job| {
684            job.status == JobStatus::Pending || job.status == JobStatus::InProgress
685        });
686    }
687
688    /// Shuts down the worker pool
689    pub fn shutdown(&mut self) {
690        // Terminate all workers
691        for worker in &self.workers {
692            worker.worker.terminate();
693        }
694        self.workers.clear();
695        self.job_queue.clear();
696        self.pending_jobs.clear();
697    }
698}
699
700/// Worker pool statistics
701#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
702pub struct PoolStats {
703    /// Total pool size
704    pub pool_size: usize,
705    /// Number of idle workers
706    pub idle_workers: usize,
707    /// Number of pending jobs in queue
708    pub pending_jobs: usize,
709    /// Total number of jobs (including in progress)
710    pub total_jobs: usize,
711    /// Number of completed jobs
712    pub completed_jobs: u64,
713    /// Number of failed jobs
714    pub failed_jobs: u64,
715}
716
717impl PoolStats {
718    /// Returns the pool utilization (fraction of busy workers)
719    pub fn utilization(&self) -> f64 {
720        if self.pool_size == 0 {
721            0.0
722        } else {
723            (self.pool_size - self.idle_workers) as f64 / self.pool_size as f64
724        }
725    }
726
727    /// Returns the success rate
728    pub fn success_rate(&self) -> f64 {
729        let total = self.completed_jobs + self.failed_jobs;
730        if total == 0 {
731            1.0
732        } else {
733            self.completed_jobs as f64 / total as f64
734        }
735    }
736}
737
738/// Tile loading coordinator
739#[allow(dead_code)]
740pub struct TileLoadCoordinator {
741    /// Worker pool
742    pool: WorkerPool,
743    /// Job callbacks
744    callbacks: HashMap<JobId, Box<dyn FnOnce(Result<WorkerJobResponse, WasmError>)>>,
745}
746
747#[allow(dead_code)]
748impl TileLoadCoordinator {
749    /// Creates a new tile load coordinator
750    pub fn new(pool_size: usize) -> WasmResult<Self> {
751        let pool = WorkerPool::new(pool_size)?;
752        Ok(Self {
753            pool,
754            callbacks: HashMap::new(),
755        })
756    }
757
758    /// Loads a tile asynchronously
759    pub fn load_tile<F>(
760        &mut self,
761        url: String,
762        coord: TileCoord,
763        timestamp: f64,
764        callback: F,
765    ) -> WasmResult<JobId>
766    where
767        F: FnOnce(Result<Vec<u8>, WasmError>) + 'static,
768    {
769        let request = WorkerJobRequest {
770            job_id: 0, // Will be set by submit_job
771            request_type: WorkerRequestType::LoadTile { url, coord },
772        };
773
774        let job_id = self
775            .pool
776            .submit_job(request, timestamp, DEFAULT_JOB_TIMEOUT_MS)?;
777
778        // Wrap callback
779        let wrapped = Box::new(
780            move |result: Result<WorkerJobResponse, WasmError>| match result {
781                Ok(response) => match response.response_type {
782                    WorkerResponseType::TileLoaded { data, .. } => callback(Ok(data)),
783                    WorkerResponseType::Error { message } => {
784                        callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
785                            expected: "TileLoaded".to_string(),
786                            actual: message,
787                        })))
788                    }
789                    _ => callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
790                        expected: "TileLoaded".to_string(),
791                        actual: format!("{:?}", response.response_type),
792                    }))),
793                },
794                Err(e) => callback(Err(e)),
795            },
796        );
797
798        self.callbacks.insert(job_id, wrapped);
799
800        Ok(job_id)
801    }
802
803    /// Loads multiple tiles asynchronously
804    pub fn load_tiles<F>(
805        &mut self,
806        url: String,
807        coords: Vec<TileCoord>,
808        timestamp: f64,
809        callback: F,
810    ) -> WasmResult<JobId>
811    where
812        F: FnOnce(Result<Vec<(TileCoord, Vec<u8>)>, WasmError>) + 'static,
813    {
814        let request = WorkerJobRequest {
815            job_id: 0,
816            request_type: WorkerRequestType::LoadTiles { url, coords },
817        };
818
819        let job_id = self
820            .pool
821            .submit_job(request, timestamp, DEFAULT_JOB_TIMEOUT_MS)?;
822
823        let wrapped = Box::new(
824            move |result: Result<WorkerJobResponse, WasmError>| match result {
825                Ok(response) => match response.response_type {
826                    WorkerResponseType::TilesLoaded { tiles } => callback(Ok(tiles)),
827                    WorkerResponseType::Error { message } => {
828                        callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
829                            expected: "TilesLoaded".to_string(),
830                            actual: message,
831                        })))
832                    }
833                    _ => callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
834                        expected: "TilesLoaded".to_string(),
835                        actual: format!("{:?}", response.response_type),
836                    }))),
837                },
838                Err(e) => callback(Err(e)),
839            },
840        );
841
842        self.callbacks.insert(job_id, wrapped);
843
844        Ok(job_id)
845    }
846
847    /// Returns pool statistics
848    pub fn stats(&self) -> PoolStats {
849        self.pool.stats()
850    }
851}
852
853/// WASM bindings for worker pool (for demonstration/testing)
854#[wasm_bindgen]
855pub struct WasmWorkerPool {
856    pool_size: usize,
857}
858
859#[wasm_bindgen]
860impl WasmWorkerPool {
861    /// Creates a new worker pool
862    #[wasm_bindgen(constructor)]
863    pub fn new(pool_size: usize) -> Self {
864        Self { pool_size }
865    }
866
867    /// Returns the pool size
868    #[wasm_bindgen(js_name = poolSize)]
869    pub fn pool_size(&self) -> usize {
870        self.pool_size
871    }
872
873    /// Returns a message about worker support
874    #[wasm_bindgen(js_name = getInfo)]
875    pub fn get_info(&self) -> String {
876        format!(
877            "Worker pool configured with {} workers. Worker creation requires a separate worker script.",
878            self.pool_size
879        )
880    }
881}
882
883#[cfg(test)]
884mod tests {
885    use super::*;
886
887    #[test]
888    fn test_pending_job() {
889        let request = WorkerJobRequest {
890            job_id: 1,
891            request_type: WorkerRequestType::GetMetadata {
892                url: "test.tif".to_string(),
893            },
894        };
895
896        let job = PendingJob::new(1, request, 0.0, 1000);
897        assert_eq!(job.job_id, 1);
898        assert_eq!(job.status, JobStatus::Pending);
899    }
900
901    #[test]
902    fn test_job_timeout() {
903        let request = WorkerJobRequest {
904            job_id: 1,
905            request_type: WorkerRequestType::GetMetadata {
906                url: "test.tif".to_string(),
907            },
908        };
909
910        let mut job = PendingJob::new(1, request, 0.0, 1000);
911        job.started_at = Some(0.0);
912
913        assert!(!job.is_timed_out(0.5)); // 500ms elapsed
914        assert!(job.is_timed_out(2.0)); // 2000ms elapsed
915    }
916
917    #[test]
918    fn test_pool_stats() {
919        let stats = PoolStats {
920            pool_size: 4,
921            idle_workers: 2,
922            pending_jobs: 5,
923            total_jobs: 10,
924            completed_jobs: 100,
925            failed_jobs: 10,
926        };
927
928        assert_eq!(stats.utilization(), 0.5); // 2 out of 4 workers busy
929        assert!((stats.success_rate() - 0.909).abs() < 0.01); // 100/110
930    }
931
932    #[test]
933    fn test_job_request_serialization() {
934        let request = WorkerJobRequest {
935            job_id: 42,
936            request_type: WorkerRequestType::LoadTile {
937                url: "test.tif".to_string(),
938                coord: TileCoord::new(5, 10, 20),
939            },
940        };
941
942        let json = serde_json::to_string(&request).expect("Serialization failed");
943        let parsed: WorkerJobRequest = serde_json::from_str(&json).expect("Deserialization failed");
944
945        assert_eq!(parsed.job_id, 42);
946    }
947
948    #[test]
949    fn test_wasm_worker_pool() {
950        let pool = WasmWorkerPool::new(4);
951        assert_eq!(pool.pool_size(), 4);
952
953        let info = pool.get_info();
954        assert!(info.contains("4 workers"));
955    }
956}