oxigdal_wasm/worker.rs
1//! Web Worker support for parallel tile loading
2//!
3//! This module provides Web Worker communication, parallel tile fetching,
4//! job queue management, worker pool coordination, and progress tracking for
5//! high-performance geospatial data visualization in the browser.
6//!
7//! # Overview
8//!
9//! The worker module implements a sophisticated worker pool system for parallel
10//! tile loading:
11//!
12//! - **Worker Pool**: Manages multiple Web Workers for concurrent tile loading
13//! - **Job Queue**: FIFO queue with support for job prioritization
14//! - **Load Balancing**: Distributes work evenly across available workers
15//! - **Timeout Handling**: Detects and recovers from stuck workers
16//! - **Progress Tracking**: Monitors tile loading progress for UI updates
17//! - **Error Recovery**: Handles worker failures gracefully
18//!
19//! # Architecture
20//!
21//! ```text
22//! ┌─────────────────┐
23//! │ Main Thread │
24//! │ (Rust/WASM) │
25//! └────────┬────────┘
26//! │
27//! │ Job Queue
28//! │
29//! ┌────────▼────────────────────────────────┐
30//! │ Worker Pool Manager │
31//! │ - Job scheduling │
32//! │ - Worker health monitoring │
33//! │ - Load balancing │
34//! └───┬─────────┬─────────┬────────┬────────┘
35//! │ │ │ │
36//! ┌───▼───┐ ┌──▼───┐ ┌───▼──┐ ┌───▼───┐
37//! │Worker │ │Worker│ │Worker│ │Worker │
38//! │ #1 │ │ #2 │ │ #3 │ │ #4 │
39//! └───────┘ └──────┘ └──────┘ └───────┘
40//! │ │ │ │
41//! └─────────┴─────────┴────────┘
42//! Network
43//! ```
44//!
45//! # Web Worker Basics
46//!
47//! Web Workers run JavaScript/WASM code in separate threads, allowing:
48//! - True parallel execution (not just concurrent)
49//! - No blocking of main UI thread
50//! - Independent memory space per worker
51//! - Communication via postMessage
52//!
53//! Limitations:
54//! - No direct DOM access
55//! - Message passing overhead (serialization)
56//! - Limited to ~4-8 workers per page (browser dependent)
57//!
58//! # Job Lifecycle
59//!
60//! ```text
61//! Submit ─> Pending ─> In Progress ─> Completed
62//! │
63//! └────────> Failed/Timed Out
64//! ```
65//!
66//! 1. **Submit**: Job is added to the queue
67//! 2. **Pending**: Job waits for available worker
68//! 3. **In Progress**: Worker is processing the job
69//! 4. **Completed**: Job finished successfully
70//! 5. **Failed**: Job encountered an error
71//! 6. **Timed Out**: Job exceeded time limit
72//!
73//! # Performance Characteristics
74//!
75//! Overhead costs:
76//! - Worker creation: ~50-100ms per worker
77//! - Message passing: ~0.1-1ms per message
78//! - Job dispatch: ~0.1ms
79//! - Serialization: ~0.5ms per 256KB tile
80//!
81//! Optimal pool size:
82//! - CPU-bound work: Number of cores (typically 4-8)
83//! - Network-bound work: 2-4 workers
84//! - Mixed workload: 3-6 workers
85//!
86//! Throughput example (4 workers, good network):
87//! - Sequential: 4 tiles/sec
88//! - Parallel: 12-16 tiles/sec (3-4x speedup)
89//!
90//! # Example: Basic Worker Pool
91//!
92//! ```ignore
93//! use oxigdal_wasm::worker::{WorkerPool, WorkerJobRequest, WorkerRequestType};
94//! use oxigdal_wasm::tile::TileCoord;
95//!
96//! // Create pool with 4 workers
97//! let mut pool = WorkerPool::new(4).expect("Failed to create pool");
98//!
99//! // Submit a tile loading job
100//! let request = WorkerJobRequest {
101//! job_id: 0,
102//! request_type: WorkerRequestType::LoadTile {
103//! url: "https://example.com/image.tif".to_string(),
104//! coord: TileCoord::new(0, 0, 0),
105//! },
106//! };
107//!
108//! let job_id = pool.submit_job(request, 0.0, 30000)
109//! .expect("Failed to submit job");
110//!
111//! println!("Job {} submitted", job_id);
112//!
113//! // Check pool statistics
114//! let stats = pool.stats();
115//! println!("Pool utilization: {:.1}%", stats.utilization() * 100.0);
116//! ```
117//!
118//! # Example: Job Prioritization
119//!
120//! ```ignore
121//! use oxigdal_wasm::worker::{WorkerPool, WorkerJobRequest, WorkerRequestType};
122//! use oxigdal_wasm::tile::TileCoord;
123//!
124//! let mut pool = WorkerPool::new(4).expect("Create failed");
125//!
126//! // Submit high-priority jobs (visible tiles)
127//! for coord in visible_tiles {
128//! let request = WorkerJobRequest {
129//! job_id: 0,
130//! request_type: WorkerRequestType::LoadTile {
131//! url: url.clone(),
132//! coord,
133//! },
134//! };
135//! pool.submit_job(request, timestamp, 10000)?;
136//! }
137//!
138//! // Submit low-priority jobs (prefetch)
139//! for coord in prefetch_tiles {
140//! let request = WorkerJobRequest {
141//! job_id: 0,
142//! request_type: WorkerRequestType::Prefetch {
143//! url: url.clone(),
144//! coords: vec![coord],
145//! },
146//! };
147//! pool.submit_job(request, timestamp, 60000)?;
148//! }
149//! ```
150//!
151//! # Example: Monitoring and Health Checks
152//!
153//! ```ignore
154//! use oxigdal_wasm::worker::WorkerPool;
155//!
156//! let mut pool = WorkerPool::new(4).expect("Create failed");
157//!
158//! // Periodically check for timeouts
159//! loop {
160//! let current_time = js_sys::Date::now();
161//! let timed_out = pool.check_timeouts(current_time);
162//!
163//! for job_id in timed_out {
164//! println!("Job {} timed out!", job_id);
165//! // Resubmit or report error
166//! }
167//!
168//! // Check pool health
169//! let stats = pool.stats();
170//! if stats.idle_workers == 0 && stats.pending_jobs > 10 {
171//! println!("Warning: Pool is saturated!");
172//! // Consider reducing load or increasing pool size
173//! }
174//!
175//! // Wait before next check
176//! await sleep(1000);
177//! }
178//! ```
179//!
180//! # Best Practices
181//!
182//! ## Pool Sizing
183//! - Start with 3-4 workers for most applications
184//! - Increase to 6-8 for high-bandwidth connections
185//! - Decrease to 2-3 for mobile devices
186//! - Monitor CPU usage and adjust accordingly
187//!
188//! ## Job Management
189//! - Set appropriate timeouts (10-30 seconds typical)
190//! - Clean up completed jobs periodically
191//! - Prioritize visible content over prefetch
192//! - Batch small jobs to reduce overhead
193//!
194//! ## Error Handling
195//! - Always handle job failures
196//! - Implement retry logic for transient errors
197//! - Report persistent failures to user
198//! - Gracefully degrade on worker unavailability
199//!
200//! ## Memory Management
201//! - Limit concurrent jobs to prevent memory pressure
202//! - Transfer large ArrayBuffers (use Transferable)
203//! - Clean up job results after processing
204//! - Monitor overall memory usage
205//!
206//! # Troubleshooting
207//!
208//! ## Workers not starting
209//! - Check CORS headers on worker script
210//! - Verify worker script URL is correct
211//! - Check browser console for errors
212//! - Ensure WASM module is loaded in worker
213//!
214//! ## Poor performance
215//! - Reduce number of workers (might be too many)
216//! - Check network latency (workers waiting on network)
217//! - Profile worker code for bottlenecks
218//! - Verify efficient message passing (avoid large copies)
219//!
220//! ## Jobs timing out
221//! - Increase timeout duration
222//! - Check network reliability
223//! - Verify server response times
224//! - Implement retry logic for failed requests
225
226use serde::{Deserialize, Serialize};
227use std::collections::{HashMap, VecDeque};
228use wasm_bindgen::prelude::*;
229use web_sys::{Worker, WorkerOptions, WorkerType};
230
231use crate::error::{WasmError, WasmResult, WorkerError};
232use crate::tile::TileCoord;
233
234/// Maximum number of workers in the pool
235#[allow(dead_code)]
236pub const DEFAULT_WORKER_POOL_SIZE: usize = 4;
237
238/// Maximum number of pending jobs per worker
239pub const MAX_PENDING_JOBS_PER_WORKER: usize = 10;
240
241/// Job timeout in milliseconds
242#[allow(dead_code)]
243pub const DEFAULT_JOB_TIMEOUT_MS: u64 = 30000;
244
245/// Unique job identifier
246pub type JobId = u64;
247
248/// Worker job request
249#[derive(Debug, Clone, Serialize, Deserialize)]
250pub struct WorkerJobRequest {
251 /// Job ID
252 pub job_id: JobId,
253 /// Request type
254 pub request_type: WorkerRequestType,
255}
256
257/// Worker request types
258#[derive(Debug, Clone, Serialize, Deserialize)]
259#[serde(tag = "type")]
260pub enum WorkerRequestType {
261 /// Load a tile
262 LoadTile {
263 /// URL of the COG
264 url: String,
265 /// Tile coordinate
266 coord: TileCoord,
267 },
268 /// Load multiple tiles
269 LoadTiles {
270 /// URL of the COG
271 url: String,
272 /// Tile coordinates
273 coords: Vec<TileCoord>,
274 },
275 /// Prefetch tiles
276 Prefetch {
277 /// URL of the COG
278 url: String,
279 /// Tile coordinates
280 coords: Vec<TileCoord>,
281 },
282 /// Get metadata
283 GetMetadata {
284 /// URL of the COG
285 url: String,
286 },
287}
288
289/// Worker job response
290#[derive(Debug, Clone, Serialize, Deserialize)]
291pub struct WorkerJobResponse {
292 /// Job ID
293 pub job_id: JobId,
294 /// Response type
295 pub response_type: WorkerResponseType,
296}
297
298/// Worker response types
299#[derive(Debug, Clone, Serialize, Deserialize)]
300#[serde(tag = "type")]
301pub enum WorkerResponseType {
302 /// Tile loaded successfully
303 TileLoaded {
304 /// Tile coordinate
305 coord: TileCoord,
306 /// Tile data
307 data: Vec<u8>,
308 },
309 /// Multiple tiles loaded
310 TilesLoaded {
311 /// Tiles data
312 tiles: Vec<(TileCoord, Vec<u8>)>,
313 },
314 /// Prefetch completed
315 PrefetchCompleted {
316 /// Number of tiles prefetched
317 count: usize,
318 },
319 /// Metadata retrieved
320 Metadata {
321 /// Metadata JSON
322 metadata: String,
323 },
324 /// Error occurred
325 Error {
326 /// Error message
327 message: String,
328 },
329}
330
331/// Job status
332#[derive(Debug, Clone, Copy, PartialEq, Eq)]
333pub enum JobStatus {
334 /// Job is pending
335 Pending,
336 /// Job is in progress
337 InProgress,
338 /// Job completed successfully
339 Completed,
340 /// Job failed
341 Failed,
342 /// Job timed out
343 TimedOut,
344}
345
346/// Pending job information
347#[derive(Debug, Clone)]
348pub struct PendingJob {
349 /// Job ID
350 pub job_id: JobId,
351 /// Worker ID (if assigned)
352 pub worker_id: Option<u32>,
353 /// Job request
354 pub request: WorkerJobRequest,
355 /// Job status
356 pub status: JobStatus,
357 /// Submission timestamp
358 pub submitted_at: f64,
359 /// Start timestamp (when worker started processing)
360 pub started_at: Option<f64>,
361 /// Completion timestamp
362 pub completed_at: Option<f64>,
363 /// Timeout duration in milliseconds
364 pub timeout_ms: u64,
365}
366
367impl PendingJob {
368 /// Creates a new pending job
369 pub fn new(job_id: JobId, request: WorkerJobRequest, timestamp: f64, timeout_ms: u64) -> Self {
370 Self {
371 job_id,
372 worker_id: None,
373 request,
374 status: JobStatus::Pending,
375 submitted_at: timestamp,
376 started_at: None,
377 completed_at: None,
378 timeout_ms,
379 }
380 }
381
382 /// Checks if the job has timed out
383 pub fn is_timed_out(&self, current_time: f64) -> bool {
384 if let Some(started) = self.started_at {
385 (current_time - started) * 1000.0 > self.timeout_ms as f64
386 } else {
387 false
388 }
389 }
390
391 /// Returns the elapsed time in milliseconds
392 pub fn elapsed_ms(&self, current_time: f64) -> f64 {
393 if let Some(started) = self.started_at {
394 (current_time - started) * 1000.0
395 } else {
396 0.0
397 }
398 }
399}
400
401/// Worker information
402#[derive(Debug)]
403pub struct WorkerInfo {
404 /// Worker ID
405 pub id: u32,
406 /// Worker instance
407 pub worker: Worker,
408 /// Current job (if any)
409 pub current_job: Option<JobId>,
410 /// Number of completed jobs
411 pub completed_jobs: u64,
412 /// Number of failed jobs
413 pub failed_jobs: u64,
414 /// Total processing time in milliseconds
415 pub total_processing_time_ms: f64,
416}
417
418impl WorkerInfo {
419 /// Creates a new worker info
420 pub fn new(id: u32, worker: Worker) -> Self {
421 Self {
422 id,
423 worker,
424 current_job: None,
425 completed_jobs: 0,
426 failed_jobs: 0,
427 total_processing_time_ms: 0.0,
428 }
429 }
430
431 /// Checks if the worker is idle
432 pub const fn is_idle(&self) -> bool {
433 self.current_job.is_none()
434 }
435
436 /// Returns the average processing time per job
437 pub fn average_processing_time_ms(&self) -> f64 {
438 let total_jobs = self.completed_jobs + self.failed_jobs;
439 if total_jobs == 0 {
440 0.0
441 } else {
442 self.total_processing_time_ms / total_jobs as f64
443 }
444 }
445}
446
447/// Worker pool for parallel tile loading
448pub struct WorkerPool {
449 /// Workers in the pool
450 workers: Vec<WorkerInfo>,
451 /// Job queue
452 job_queue: VecDeque<JobId>,
453 /// Pending jobs
454 pending_jobs: HashMap<JobId, PendingJob>,
455 /// Next job ID
456 next_job_id: JobId,
457 /// Pool size
458 pool_size: usize,
459 /// Maximum pending jobs
460 max_pending_jobs: usize,
461}
462
463impl WorkerPool {
464 /// Creates a new worker pool
465 pub fn new(pool_size: usize) -> WasmResult<Self> {
466 let mut workers = Vec::with_capacity(pool_size);
467
468 for i in 0..pool_size {
469 let worker = Self::create_worker(i as u32)?;
470 workers.push(WorkerInfo::new(i as u32, worker));
471 }
472
473 Ok(Self {
474 workers,
475 job_queue: VecDeque::new(),
476 pending_jobs: HashMap::new(),
477 next_job_id: 0,
478 pool_size,
479 max_pending_jobs: pool_size * MAX_PENDING_JOBS_PER_WORKER,
480 })
481 }
482
483 /// Creates a new worker
484 fn create_worker(_id: u32) -> WasmResult<Worker> {
485 let options = WorkerOptions::new();
486 options.set_type(WorkerType::Module);
487
488 // Create worker from script URL
489 // The worker script should be served alongside the main application
490 let worker = Worker::new_with_options("./cog-worker.js", &options).map_err(|e| {
491 WasmError::Worker(WorkerError::CreationFailed {
492 message: format!("Failed to create worker: {e:?}"),
493 })
494 })?;
495
496 Ok(worker)
497 }
498
499 /// Submits a job to the pool
500 pub fn submit_job(
501 &mut self,
502 request: WorkerJobRequest,
503 timestamp: f64,
504 timeout_ms: u64,
505 ) -> WasmResult<JobId> {
506 if self.pending_jobs.len() >= self.max_pending_jobs {
507 return Err(WasmError::Worker(WorkerError::PoolExhausted {
508 pool_size: self.pool_size,
509 pending_jobs: self.pending_jobs.len(),
510 }));
511 }
512
513 let job_id = self.next_job_id;
514 self.next_job_id += 1;
515
516 let job = PendingJob::new(job_id, request, timestamp, timeout_ms);
517 self.pending_jobs.insert(job_id, job);
518 self.job_queue.push_back(job_id);
519
520 self.dispatch_jobs(timestamp)?;
521
522 Ok(job_id)
523 }
524
525 /// Dispatches pending jobs to idle workers
526 fn dispatch_jobs(&mut self, timestamp: f64) -> WasmResult<()> {
527 while let Some(job_id) = self.job_queue.pop_front() {
528 // Find an idle worker
529 let worker_idx = self.workers.iter().position(|w| w.is_idle());
530
531 if let Some(idx) = worker_idx {
532 // Assign job to worker
533 self.workers[idx].current_job = Some(job_id);
534
535 if let Some(job) = self.pending_jobs.get_mut(&job_id) {
536 job.worker_id = Some(self.workers[idx].id);
537 job.status = JobStatus::InProgress;
538 job.started_at = Some(timestamp);
539
540 // Post message to worker
541 let message = serde_json::to_string(&job.request).map_err(|e| {
542 WasmError::Worker(WorkerError::PostMessageFailed {
543 worker_id: self.workers[idx].id,
544 message: e.to_string(),
545 })
546 })?;
547
548 self.workers[idx]
549 .worker
550 .post_message(&JsValue::from_str(&message))
551 .map_err(|e| {
552 WasmError::Worker(WorkerError::PostMessageFailed {
553 worker_id: self.workers[idx].id,
554 message: format!("{e:?}"),
555 })
556 })?;
557 }
558 } else {
559 // No idle workers, put job back in queue
560 self.job_queue.push_front(job_id);
561 break;
562 }
563 }
564
565 Ok(())
566 }
567
568 /// Handles a worker response
569 pub fn handle_response(
570 &mut self,
571 worker_id: u32,
572 response: WorkerJobResponse,
573 timestamp: f64,
574 ) -> WasmResult<()> {
575 let job_id = response.job_id;
576
577 // Find the worker
578 let worker_idx = self
579 .workers
580 .iter()
581 .position(|w| w.id == worker_id)
582 .ok_or_else(|| {
583 WasmError::Worker(WorkerError::InvalidResponse {
584 expected: format!("worker {worker_id}"),
585 actual: "unknown worker".to_string(),
586 })
587 })?;
588
589 // Update job status
590 if let Some(job) = self.pending_jobs.get_mut(&job_id) {
591 let elapsed = if let Some(started) = job.started_at {
592 (timestamp - started) * 1000.0
593 } else {
594 0.0
595 };
596
597 match &response.response_type {
598 WorkerResponseType::Error { .. } => {
599 job.status = JobStatus::Failed;
600 self.workers[worker_idx].failed_jobs += 1;
601 }
602 _ => {
603 job.status = JobStatus::Completed;
604 self.workers[worker_idx].completed_jobs += 1;
605 }
606 }
607
608 job.completed_at = Some(timestamp);
609 self.workers[worker_idx].total_processing_time_ms += elapsed;
610 }
611
612 // Mark worker as idle
613 self.workers[worker_idx].current_job = None;
614
615 // Dispatch next job
616 self.dispatch_jobs(timestamp)?;
617
618 Ok(())
619 }
620
621 /// Checks for timed out jobs
622 pub fn check_timeouts(&mut self, current_time: f64) -> Vec<JobId> {
623 let mut timed_out = Vec::new();
624
625 for (job_id, job) in &mut self.pending_jobs {
626 if job.status == JobStatus::InProgress && job.is_timed_out(current_time) {
627 job.status = JobStatus::TimedOut;
628 timed_out.push(*job_id);
629
630 // Mark worker as idle
631 if let Some(worker_id) = job.worker_id {
632 if let Some(worker) = self.workers.iter_mut().find(|w| w.id == worker_id) {
633 worker.current_job = None;
634 worker.failed_jobs += 1;
635 }
636 }
637 }
638 }
639
640 timed_out
641 }
642
643 /// Returns the job status
644 pub fn job_status(&self, job_id: JobId) -> Option<JobStatus> {
645 self.pending_jobs.get(&job_id).map(|j| j.status)
646 }
647
648 /// Cancels a job
649 pub fn cancel_job(&mut self, job_id: JobId) -> WasmResult<()> {
650 if let Some(job) = self.pending_jobs.get_mut(&job_id) {
651 job.status = JobStatus::Failed;
652
653 // Remove from queue if pending
654 if let Some(pos) = self.job_queue.iter().position(|&id| id == job_id) {
655 self.job_queue.remove(pos);
656 }
657
658 // If job is in progress, we can't easily stop the worker
659 // Just mark it as failed and the worker will be released when it responds
660 }
661
662 Ok(())
663 }
664
665 /// Returns pool statistics
666 pub fn stats(&self) -> PoolStats {
667 let idle_workers = self.workers.iter().filter(|w| w.is_idle()).count();
668 let total_completed: u64 = self.workers.iter().map(|w| w.completed_jobs).sum();
669 let total_failed: u64 = self.workers.iter().map(|w| w.failed_jobs).sum();
670
671 PoolStats {
672 pool_size: self.pool_size,
673 idle_workers,
674 pending_jobs: self.job_queue.len(),
675 total_jobs: self.pending_jobs.len(),
676 completed_jobs: total_completed,
677 failed_jobs: total_failed,
678 }
679 }
680
681 /// Clears completed and failed jobs
682 pub fn cleanup_jobs(&mut self) {
683 self.pending_jobs.retain(|_, job| {
684 job.status == JobStatus::Pending || job.status == JobStatus::InProgress
685 });
686 }
687
688 /// Shuts down the worker pool
689 pub fn shutdown(&mut self) {
690 // Terminate all workers
691 for worker in &self.workers {
692 worker.worker.terminate();
693 }
694 self.workers.clear();
695 self.job_queue.clear();
696 self.pending_jobs.clear();
697 }
698}
699
700/// Worker pool statistics
701#[derive(Debug, Clone, Copy, Serialize, Deserialize)]
702pub struct PoolStats {
703 /// Total pool size
704 pub pool_size: usize,
705 /// Number of idle workers
706 pub idle_workers: usize,
707 /// Number of pending jobs in queue
708 pub pending_jobs: usize,
709 /// Total number of jobs (including in progress)
710 pub total_jobs: usize,
711 /// Number of completed jobs
712 pub completed_jobs: u64,
713 /// Number of failed jobs
714 pub failed_jobs: u64,
715}
716
717impl PoolStats {
718 /// Returns the pool utilization (fraction of busy workers)
719 pub fn utilization(&self) -> f64 {
720 if self.pool_size == 0 {
721 0.0
722 } else {
723 (self.pool_size - self.idle_workers) as f64 / self.pool_size as f64
724 }
725 }
726
727 /// Returns the success rate
728 pub fn success_rate(&self) -> f64 {
729 let total = self.completed_jobs + self.failed_jobs;
730 if total == 0 {
731 1.0
732 } else {
733 self.completed_jobs as f64 / total as f64
734 }
735 }
736}
737
738/// Tile loading coordinator
739#[allow(dead_code)]
740pub struct TileLoadCoordinator {
741 /// Worker pool
742 pool: WorkerPool,
743 /// Job callbacks
744 callbacks: HashMap<JobId, Box<dyn FnOnce(Result<WorkerJobResponse, WasmError>)>>,
745}
746
747#[allow(dead_code)]
748impl TileLoadCoordinator {
749 /// Creates a new tile load coordinator
750 pub fn new(pool_size: usize) -> WasmResult<Self> {
751 let pool = WorkerPool::new(pool_size)?;
752 Ok(Self {
753 pool,
754 callbacks: HashMap::new(),
755 })
756 }
757
758 /// Loads a tile asynchronously
759 pub fn load_tile<F>(
760 &mut self,
761 url: String,
762 coord: TileCoord,
763 timestamp: f64,
764 callback: F,
765 ) -> WasmResult<JobId>
766 where
767 F: FnOnce(Result<Vec<u8>, WasmError>) + 'static,
768 {
769 let request = WorkerJobRequest {
770 job_id: 0, // Will be set by submit_job
771 request_type: WorkerRequestType::LoadTile { url, coord },
772 };
773
774 let job_id = self
775 .pool
776 .submit_job(request, timestamp, DEFAULT_JOB_TIMEOUT_MS)?;
777
778 // Wrap callback
779 let wrapped = Box::new(
780 move |result: Result<WorkerJobResponse, WasmError>| match result {
781 Ok(response) => match response.response_type {
782 WorkerResponseType::TileLoaded { data, .. } => callback(Ok(data)),
783 WorkerResponseType::Error { message } => {
784 callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
785 expected: "TileLoaded".to_string(),
786 actual: message,
787 })))
788 }
789 _ => callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
790 expected: "TileLoaded".to_string(),
791 actual: format!("{:?}", response.response_type),
792 }))),
793 },
794 Err(e) => callback(Err(e)),
795 },
796 );
797
798 self.callbacks.insert(job_id, wrapped);
799
800 Ok(job_id)
801 }
802
803 /// Loads multiple tiles asynchronously
804 pub fn load_tiles<F>(
805 &mut self,
806 url: String,
807 coords: Vec<TileCoord>,
808 timestamp: f64,
809 callback: F,
810 ) -> WasmResult<JobId>
811 where
812 F: FnOnce(Result<Vec<(TileCoord, Vec<u8>)>, WasmError>) + 'static,
813 {
814 let request = WorkerJobRequest {
815 job_id: 0,
816 request_type: WorkerRequestType::LoadTiles { url, coords },
817 };
818
819 let job_id = self
820 .pool
821 .submit_job(request, timestamp, DEFAULT_JOB_TIMEOUT_MS)?;
822
823 let wrapped = Box::new(
824 move |result: Result<WorkerJobResponse, WasmError>| match result {
825 Ok(response) => match response.response_type {
826 WorkerResponseType::TilesLoaded { tiles } => callback(Ok(tiles)),
827 WorkerResponseType::Error { message } => {
828 callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
829 expected: "TilesLoaded".to_string(),
830 actual: message,
831 })))
832 }
833 _ => callback(Err(WasmError::Worker(WorkerError::InvalidResponse {
834 expected: "TilesLoaded".to_string(),
835 actual: format!("{:?}", response.response_type),
836 }))),
837 },
838 Err(e) => callback(Err(e)),
839 },
840 );
841
842 self.callbacks.insert(job_id, wrapped);
843
844 Ok(job_id)
845 }
846
847 /// Returns pool statistics
848 pub fn stats(&self) -> PoolStats {
849 self.pool.stats()
850 }
851}
852
853/// WASM bindings for worker pool (for demonstration/testing)
854#[wasm_bindgen]
855pub struct WasmWorkerPool {
856 pool_size: usize,
857}
858
859#[wasm_bindgen]
860impl WasmWorkerPool {
861 /// Creates a new worker pool
862 #[wasm_bindgen(constructor)]
863 pub fn new(pool_size: usize) -> Self {
864 Self { pool_size }
865 }
866
867 /// Returns the pool size
868 #[wasm_bindgen(js_name = poolSize)]
869 pub fn pool_size(&self) -> usize {
870 self.pool_size
871 }
872
873 /// Returns a message about worker support
874 #[wasm_bindgen(js_name = getInfo)]
875 pub fn get_info(&self) -> String {
876 format!(
877 "Worker pool configured with {} workers. Worker creation requires a separate worker script.",
878 self.pool_size
879 )
880 }
881}
882
883#[cfg(test)]
884mod tests {
885 use super::*;
886
887 #[test]
888 fn test_pending_job() {
889 let request = WorkerJobRequest {
890 job_id: 1,
891 request_type: WorkerRequestType::GetMetadata {
892 url: "test.tif".to_string(),
893 },
894 };
895
896 let job = PendingJob::new(1, request, 0.0, 1000);
897 assert_eq!(job.job_id, 1);
898 assert_eq!(job.status, JobStatus::Pending);
899 }
900
901 #[test]
902 fn test_job_timeout() {
903 let request = WorkerJobRequest {
904 job_id: 1,
905 request_type: WorkerRequestType::GetMetadata {
906 url: "test.tif".to_string(),
907 },
908 };
909
910 let mut job = PendingJob::new(1, request, 0.0, 1000);
911 job.started_at = Some(0.0);
912
913 assert!(!job.is_timed_out(0.5)); // 500ms elapsed
914 assert!(job.is_timed_out(2.0)); // 2000ms elapsed
915 }
916
917 #[test]
918 fn test_pool_stats() {
919 let stats = PoolStats {
920 pool_size: 4,
921 idle_workers: 2,
922 pending_jobs: 5,
923 total_jobs: 10,
924 completed_jobs: 100,
925 failed_jobs: 10,
926 };
927
928 assert_eq!(stats.utilization(), 0.5); // 2 out of 4 workers busy
929 assert!((stats.success_rate() - 0.909).abs() < 0.01); // 100/110
930 }
931
932 #[test]
933 fn test_job_request_serialization() {
934 let request = WorkerJobRequest {
935 job_id: 42,
936 request_type: WorkerRequestType::LoadTile {
937 url: "test.tif".to_string(),
938 coord: TileCoord::new(5, 10, 20),
939 },
940 };
941
942 let json = serde_json::to_string(&request).expect("Serialization failed");
943 let parsed: WorkerJobRequest = serde_json::from_str(&json).expect("Deserialization failed");
944
945 assert_eq!(parsed.job_id, 42);
946 }
947
948 #[test]
949 fn test_wasm_worker_pool() {
950 let pool = WasmWorkerPool::new(4);
951 assert_eq!(pool.pool_size(), 4);
952
953 let info = pool.get_info();
954 assert!(info.contains("4 workers"));
955 }
956}