Skip to main content

pdf_ast/performance/
enhanced_lazy.rs

1use crate::ast::NodeId;
2use crate::parser::content_stream::ContentOperator;
3use crate::types::{ObjectId, PdfDictionary};
4use bytes::Bytes;
5use parking_lot::Mutex;
6use std::collections::{HashMap, VecDeque};
7use std::sync::{Arc, RwLock};
8use std::time::{Duration, Instant};
9
10/// Enhanced lazy loading with comprehensive limits
11#[derive(Debug, Clone)]
12pub struct LazyLimits {
13    /// Maximum pages to load at once
14    pub max_pages: Option<usize>,
15
16    /// Maximum bytes per operation
17    pub max_bytes_per_load: usize,
18
19    /// Maximum operators per content stream
20    pub max_operators: Option<usize>,
21
22    /// Maximum images to decode simultaneously
23    pub max_concurrent_images: usize,
24
25    /// Maximum depth for nested objects
26    pub max_object_depth: usize,
27
28    /// Timeout for individual load operations
29    pub load_timeout: Duration,
30
31    /// Memory pressure threshold (0.0-1.0)
32    pub memory_pressure_threshold: f64,
33
34    /// Enable aggressive caching
35    pub aggressive_caching: bool,
36
37    /// Priority levels for different content types
38    pub priority_levels: ContentPriorities,
39}
40
41#[derive(Debug, Clone)]
42pub struct ContentPriorities {
43    pub text: u8,
44    pub images: u8,
45    pub vector_graphics: u8,
46    pub forms: u8,
47    pub annotations: u8,
48    pub metadata: u8,
49}
50
51impl Default for LazyLimits {
52    fn default() -> Self {
53        Self {
54            max_pages: Some(10),
55            max_bytes_per_load: 50 * 1024 * 1024, // 50MB
56            max_operators: Some(100_000),
57            max_concurrent_images: 4,
58            max_object_depth: 50,
59            load_timeout: Duration::from_secs(30),
60            memory_pressure_threshold: 0.8,
61            aggressive_caching: false,
62            priority_levels: ContentPriorities::default(),
63        }
64    }
65}
66
67impl Default for ContentPriorities {
68    fn default() -> Self {
69        Self {
70            text: 10, // Highest priority
71            images: 7,
72            vector_graphics: 8,
73            forms: 6,
74            annotations: 5,
75            metadata: 3, // Lowest priority
76        }
77    }
78}
79
80/// Enhanced lazy loader with comprehensive resource management
81pub struct EnhancedLazyLoader {
82    limits: LazyLimits,
83
84    /// Currently loaded pages
85    loaded_pages: Arc<RwLock<HashMap<u32, Arc<LazyPage>>>>,
86
87    /// Load queue with priorities
88    load_queue: Arc<Mutex<VecDeque<LoadRequest>>>,
89
90    /// Memory usage tracker
91    memory_usage: Arc<RwLock<MemoryUsage>>,
92
93    /// Performance metrics
94    metrics: Arc<RwLock<LoaderMetrics>>,
95
96    /// Content stream cache
97    stream_cache: Arc<RwLock<HashMap<ObjectId, Arc<LazyContentStream>>>>,
98
99    /// Image cache with LRU eviction
100    image_cache: Arc<RwLock<LruCache<ObjectId, Arc<LazyImage>>>>,
101}
102
103#[derive(Debug, Clone)]
104pub struct LoadRequest {
105    pub request_type: LoadRequestType,
106    pub priority: u8,
107    pub node_id: NodeId,
108    pub object_id: Option<ObjectId>,
109    pub requested_at: Instant,
110    pub timeout: Duration,
111}
112
113#[derive(Debug, Clone)]
114pub enum LoadRequestType {
115    Page {
116        page_number: u32,
117    },
118    ContentStream {
119        stream_id: ObjectId,
120        operator_limit: Option<usize>,
121    },
122    Image {
123        image_id: ObjectId,
124        decode_params: ImageDecodeParams,
125    },
126    Font {
127        font_id: ObjectId,
128        subset_only: bool,
129    },
130    Annotation {
131        annotation_id: ObjectId,
132    },
133    FormField {
134        field_id: ObjectId,
135    },
136    Metadata {
137        metadata_type: String,
138    },
139}
140
141#[derive(Debug, Clone)]
142pub struct ImageDecodeParams {
143    pub max_width: Option<u32>,
144    pub max_height: Option<u32>,
145    pub decode_quality: f32, // 0.0 to 1.0
146    pub color_space_conversion: Option<String>,
147}
148
149#[derive(Debug)]
150pub struct LazyPage {
151    pub page_number: u32,
152    pub page_dict: Arc<RwLock<Option<PdfDictionary>>>,
153    pub content_streams: Vec<ObjectId>,
154    pub resources: Arc<RwLock<Option<PdfDictionary>>>,
155    pub annotations: Vec<ObjectId>,
156    pub load_state: Arc<RwLock<PageLoadState>>,
157    pub load_priority: u8,
158}
159
160#[derive(Debug, Clone)]
161pub enum PageLoadState {
162    NotLoaded,
163    Loading { started_at: Instant },
164    PartiallyLoaded { components: LoadedComponents },
165    FullyLoaded { loaded_at: Instant },
166    LoadError { error: String },
167}
168
169#[derive(Debug, Clone)]
170pub struct LoadedComponents {
171    pub basic_info: bool,
172    pub content_streams: bool,
173    pub resources: bool,
174    pub annotations: bool,
175    pub images: bool,
176    pub fonts: bool,
177}
178
179pub struct LazyContentStream {
180    pub stream_id: ObjectId,
181    pub operators: Arc<RwLock<Option<Vec<ContentOperator>>>>,
182    pub operator_count: usize,
183    pub text_content: Arc<RwLock<Option<String>>>,
184    pub graphics_state_stack_depth: u32,
185    pub parsing_state: Arc<RwLock<StreamParsingState>>,
186}
187
188#[derive(Debug, Clone)]
189pub enum StreamParsingState {
190    NotParsed,
191    Parsing {
192        progress_pct: f32,
193    },
194    ParsedPartial {
195        operators_parsed: usize,
196        stopped_reason: String,
197    },
198    ParsedComplete {
199        parsed_at: Instant,
200    },
201    ParseError {
202        error: String,
203    },
204}
205
206pub struct LazyImage {
207    pub image_id: ObjectId,
208    pub metadata: ImageMetadata,
209    pub raw_data: Arc<RwLock<Option<Bytes>>>,
210    pub decoded_data: Arc<RwLock<Option<DecodedImage>>>,
211    pub decode_state: Arc<RwLock<ImageDecodeState>>,
212}
213
214#[derive(Debug, Clone)]
215pub struct ImageMetadata {
216    pub width: u32,
217    pub height: u32,
218    pub bits_per_component: u8,
219    pub color_space: String,
220    pub compression: Option<String>,
221    pub size_bytes: usize,
222    pub is_mask: bool,
223    pub is_inline: bool,
224}
225
226pub struct DecodedImage {
227    pub pixels: Vec<u8>,
228    pub width: u32,
229    pub height: u32,
230    pub color_space: String,
231    pub decoded_at: Instant,
232}
233
234#[derive(Debug, Clone)]
235pub enum ImageDecodeState {
236    NotDecoded,
237    Decoding { progress_pct: f32 },
238    Decoded { quality: f32 },
239    DecodeError { error: String },
240    Skipped { reason: String },
241}
242
243#[derive(Debug)]
244pub struct MemoryUsage {
245    pub pages_mb: f64,
246    pub content_streams_mb: f64,
247    pub images_mb: f64,
248    pub fonts_mb: f64,
249    pub metadata_mb: f64,
250    pub total_mb: f64,
251    pub peak_mb: f64,
252}
253
254#[derive(Debug, Default)]
255pub struct LoaderMetrics {
256    pub pages_loaded: u32,
257    pub content_streams_parsed: u32,
258    pub images_decoded: u32,
259    pub cache_hits: u32,
260    pub cache_misses: u32,
261    pub load_timeouts: u32,
262    pub memory_pressure_events: u32,
263    pub average_load_time_ms: f64,
264    pub total_bytes_loaded: u64,
265}
266
267/// LRU Cache implementation for images
268pub struct LruCache<K, V> {
269    data: HashMap<K, V>,
270    order: VecDeque<K>,
271    capacity: usize,
272}
273
274impl<K: Clone + std::hash::Hash + Eq, V> LruCache<K, V> {
275    pub fn new(capacity: usize) -> Self {
276        Self {
277            data: HashMap::with_capacity(capacity),
278            order: VecDeque::with_capacity(capacity),
279            capacity,
280        }
281    }
282
283    pub fn get(&mut self, key: &K) -> Option<&V> {
284        if self.data.contains_key(key) {
285            // Move to front
286            self.order.retain(|k| k != key);
287            self.order.push_front(key.clone());
288            self.data.get(key)
289        } else {
290            None
291        }
292    }
293
294    pub fn insert(&mut self, key: K, value: V) -> Option<V> {
295        if let Some(old_value) = self.data.remove(&key) {
296            self.order.retain(|k| k != &key);
297            self.order.push_front(key.clone());
298            self.data.insert(key, value);
299            Some(old_value)
300        } else {
301            if self.data.len() >= self.capacity {
302                // Evict least recently used
303                if let Some(evicted_key) = self.order.pop_back() {
304                    self.data.remove(&evicted_key);
305                }
306            }
307            self.order.push_front(key.clone());
308            self.data.insert(key, value);
309            None
310        }
311    }
312}
313
314impl EnhancedLazyLoader {
315    pub fn new(limits: LazyLimits) -> Self {
316        Self {
317            limits: limits.clone(),
318            loaded_pages: Arc::new(RwLock::new(HashMap::new())),
319            load_queue: Arc::new(Mutex::new(VecDeque::new())),
320            memory_usage: Arc::new(RwLock::new(MemoryUsage {
321                pages_mb: 0.0,
322                content_streams_mb: 0.0,
323                images_mb: 0.0,
324                fonts_mb: 0.0,
325                metadata_mb: 0.0,
326                total_mb: 0.0,
327                peak_mb: 0.0,
328            })),
329            metrics: Arc::new(RwLock::new(LoaderMetrics::default())),
330            stream_cache: Arc::new(RwLock::new(HashMap::new())),
331            image_cache: Arc::new(RwLock::new(LruCache::new(limits.max_concurrent_images * 2))),
332        }
333    }
334
335    /// Request loading of specific pages with limits
336    pub fn request_pages(
337        &self,
338        page_range: std::ops::Range<u32>,
339    ) -> Result<Vec<Arc<LazyPage>>, String> {
340        let requested_count = (page_range.end - page_range.start) as usize;
341
342        // Check page limits
343        if let Some(max_pages) = self.limits.max_pages {
344            if requested_count > max_pages {
345                return Err(format!(
346                    "Requested {} pages exceeds limit of {}",
347                    requested_count, max_pages
348                ));
349            }
350        }
351
352        let mut pages = Vec::new();
353        for page_num in page_range {
354            let page = self.get_or_create_page(page_num);
355            self.queue_page_load(page_num)?;
356            pages.push(page);
357        }
358
359        Ok(pages)
360    }
361
362    /// Request content stream parsing with operator limits
363    pub fn request_content_stream(
364        &self,
365        stream_id: ObjectId,
366        operator_limit: Option<usize>,
367    ) -> Result<Arc<LazyContentStream>, String> {
368        // Check if already cached
369        {
370            let cache = self.stream_cache.read().unwrap();
371            if let Some(stream) = cache.get(&stream_id) {
372                return Ok(stream.clone());
373            }
374        }
375
376        // Create lazy content stream
377        let stream = Arc::new(LazyContentStream {
378            stream_id,
379            operators: Arc::new(RwLock::new(None)),
380            operator_count: 0,
381            text_content: Arc::new(RwLock::new(None)),
382            graphics_state_stack_depth: 0,
383            parsing_state: Arc::new(RwLock::new(StreamParsingState::NotParsed)),
384        });
385
386        // Add to cache
387        {
388            let mut cache = self.stream_cache.write().unwrap();
389            cache.insert(stream_id, stream.clone());
390        }
391
392        // Queue for parsing
393        let request = LoadRequest {
394            request_type: LoadRequestType::ContentStream {
395                stream_id,
396                operator_limit: operator_limit.or(self.limits.max_operators),
397            },
398            priority: self.limits.priority_levels.vector_graphics,
399            node_id: NodeId(0), // Would need proper mapping
400            object_id: Some(stream_id),
401            requested_at: Instant::now(),
402            timeout: self.limits.load_timeout,
403        };
404
405        self.queue_request(request)?;
406        Ok(stream)
407    }
408
409    /// Request image loading with decode parameters
410    pub fn request_image(
411        &self,
412        image_id: ObjectId,
413        decode_params: ImageDecodeParams,
414    ) -> Result<Arc<LazyImage>, String> {
415        // Check concurrent image limit
416        let current_images = {
417            let cache = self.image_cache.read().unwrap();
418            cache.data.len()
419        };
420
421        if current_images >= self.limits.max_concurrent_images {
422            return Err(format!(
423                "Maximum concurrent images ({}) exceeded",
424                self.limits.max_concurrent_images
425            ));
426        }
427
428        // Create lazy image
429        let image = Arc::new(LazyImage {
430            image_id,
431            metadata: ImageMetadata {
432                width: 0, // Would be populated from image dictionary
433                height: 0,
434                bits_per_component: 8,
435                color_space: "RGB".to_string(),
436                compression: None,
437                size_bytes: 0,
438                is_mask: false,
439                is_inline: false,
440            },
441            raw_data: Arc::new(RwLock::new(None)),
442            decoded_data: Arc::new(RwLock::new(None)),
443            decode_state: Arc::new(RwLock::new(ImageDecodeState::NotDecoded)),
444        });
445
446        // Add to cache
447        {
448            let mut cache = self.image_cache.write().unwrap();
449            cache.insert(image_id, image.clone());
450        }
451
452        // Queue for loading
453        let request = LoadRequest {
454            request_type: LoadRequestType::Image {
455                image_id,
456                decode_params,
457            },
458            priority: self.limits.priority_levels.images,
459            node_id: NodeId(0),
460            object_id: Some(image_id),
461            requested_at: Instant::now(),
462            timeout: self.limits.load_timeout,
463        };
464
465        self.queue_request(request)?;
466        Ok(image)
467    }
468
469    fn get_or_create_page(&self, page_number: u32) -> Arc<LazyPage> {
470        {
471            let pages = self.loaded_pages.read().unwrap();
472            if let Some(page) = pages.get(&page_number) {
473                return page.clone();
474            }
475        }
476
477        let page = Arc::new(LazyPage {
478            page_number,
479            page_dict: Arc::new(RwLock::new(None)),
480            content_streams: Vec::new(),
481            resources: Arc::new(RwLock::new(None)),
482            annotations: Vec::new(),
483            load_state: Arc::new(RwLock::new(PageLoadState::NotLoaded)),
484            load_priority: self.limits.priority_levels.text, // Default to text priority
485        });
486
487        {
488            let mut pages = self.loaded_pages.write().unwrap();
489            pages.insert(page_number, page.clone());
490        }
491
492        page
493    }
494
495    fn queue_page_load(&self, page_number: u32) -> Result<(), String> {
496        let request = LoadRequest {
497            request_type: LoadRequestType::Page { page_number },
498            priority: self.limits.priority_levels.text,
499            node_id: NodeId(0),
500            object_id: None,
501            requested_at: Instant::now(),
502            timeout: self.limits.load_timeout,
503        };
504
505        self.queue_request(request)
506    }
507
508    fn queue_request(&self, request: LoadRequest) -> Result<(), String> {
509        let mut queue = self.load_queue.lock();
510
511        // Find insertion point for priority order
512        let mut insert_index = None;
513        for (i, existing) in queue.iter().enumerate() {
514            if request.priority > existing.priority {
515                insert_index = Some(i);
516                break;
517            }
518        }
519
520        // Insert at the appropriate position
521        if let Some(index) = insert_index {
522            queue.insert(index, request);
523        } else {
524            queue.push_back(request);
525        }
526
527        Ok(())
528    }
529
530    /// Check memory pressure and trigger cleanup if needed
531    pub fn check_memory_pressure(&self) -> Result<(), String> {
532        let usage = self.memory_usage.read().unwrap();
533        let pressure = usage.total_mb / (self.limits.max_bytes_per_load as f64 / (1024.0 * 1024.0));
534
535        if pressure > self.limits.memory_pressure_threshold {
536            drop(usage);
537            self.cleanup_memory()?;
538
539            let mut metrics = self.metrics.write().unwrap();
540            metrics.memory_pressure_events += 1;
541        }
542
543        Ok(())
544    }
545
546    fn cleanup_memory(&self) -> Result<(), String> {
547        // Clear least recently used content streams
548        {
549            let mut cache = self.stream_cache.write().unwrap();
550            if cache.len() > 10 {
551                // Keep only the 10 most recent
552                let keys_to_remove: Vec<_> = cache.keys().take(cache.len() - 10).cloned().collect();
553                for key in keys_to_remove {
554                    cache.remove(&key);
555                }
556            }
557        }
558
559        // Clear decoded images if memory pressure is high
560        {
561            let image_cache = self.image_cache.write().unwrap();
562            for (_, image) in image_cache.data.iter() {
563                let mut decoded = image.decoded_data.write().unwrap();
564                *decoded = None; // Clear decoded data, keep raw data
565            }
566        }
567
568        Ok(())
569    }
570
571    /// Get current memory usage
572    pub fn get_memory_usage(&self) -> MemoryUsage {
573        let usage = self.memory_usage.read().unwrap();
574        MemoryUsage {
575            pages_mb: usage.pages_mb,
576            content_streams_mb: usage.content_streams_mb,
577            images_mb: usage.images_mb,
578            fonts_mb: usage.fonts_mb,
579            metadata_mb: usage.metadata_mb,
580            total_mb: usage.total_mb,
581            peak_mb: usage.peak_mb,
582        }
583    }
584
585    /// Get current metrics
586    pub fn get_metrics(&self) -> LoaderMetrics {
587        let metrics = self.metrics.read().unwrap();
588        LoaderMetrics {
589            pages_loaded: metrics.pages_loaded,
590            content_streams_parsed: metrics.content_streams_parsed,
591            images_decoded: metrics.images_decoded,
592            cache_hits: metrics.cache_hits,
593            cache_misses: metrics.cache_misses,
594            load_timeouts: metrics.load_timeouts,
595            memory_pressure_events: metrics.memory_pressure_events,
596            average_load_time_ms: metrics.average_load_time_ms,
597            total_bytes_loaded: metrics.total_bytes_loaded,
598        }
599    }
600
601    /// Process the load queue
602    pub fn process_queue(&self, max_items: usize) -> Result<usize, String> {
603        let mut processed = 0;
604
605        while processed < max_items {
606            let request = {
607                let mut queue = self.load_queue.lock();
608                queue.pop_front()
609            };
610
611            let request = match request {
612                Some(req) => req,
613                None => break, // Queue empty
614            };
615
616            // Check timeout
617            if request.requested_at.elapsed() > request.timeout {
618                let mut metrics = self.metrics.write().unwrap();
619                metrics.load_timeouts += 1;
620                continue;
621            }
622
623            self.process_request(request)?;
624            processed += 1;
625        }
626
627        Ok(processed)
628    }
629
630    fn process_request(&self, request: LoadRequest) -> Result<(), String> {
631        let start_time = Instant::now();
632
633        match request.request_type {
634            LoadRequestType::Page { page_number } => {
635                self.load_page_data(page_number)?;
636            }
637            LoadRequestType::ContentStream {
638                stream_id,
639                operator_limit,
640            } => {
641                self.parse_content_stream(stream_id, operator_limit)?;
642            }
643            LoadRequestType::Image {
644                image_id,
645                decode_params,
646            } => {
647                self.decode_image(image_id, decode_params)?;
648            }
649            LoadRequestType::Font {
650                font_id,
651                subset_only,
652            } => {
653                self.load_font(font_id, subset_only)?;
654            }
655            LoadRequestType::Annotation { annotation_id } => {
656                self.load_annotation(annotation_id)?;
657            }
658            LoadRequestType::FormField { field_id } => {
659                self.load_form_field(field_id)?;
660            }
661            LoadRequestType::Metadata { metadata_type } => {
662                self.load_metadata(&metadata_type)?;
663            }
664        }
665
666        // Update metrics
667        let load_time = start_time.elapsed().as_millis() as f64;
668        let mut metrics = self.metrics.write().unwrap();
669        metrics.average_load_time_ms = (metrics.average_load_time_ms + load_time) / 2.0;
670
671        Ok(())
672    }
673
674    fn load_page_data(&self, _page_number: u32) -> Result<(), String> {
675        // Implementation would load page dictionary and basic structure
676        let mut metrics = self.metrics.write().unwrap();
677        metrics.pages_loaded += 1;
678        Ok(())
679    }
680
681    fn parse_content_stream(
682        &self,
683        _stream_id: ObjectId,
684        _operator_limit: Option<usize>,
685    ) -> Result<(), String> {
686        // Implementation would parse content stream with operator limits
687        let mut metrics = self.metrics.write().unwrap();
688        metrics.content_streams_parsed += 1;
689        Ok(())
690    }
691
692    fn decode_image(
693        &self,
694        _image_id: ObjectId,
695        _decode_params: ImageDecodeParams,
696    ) -> Result<(), String> {
697        // Implementation would decode image with specified parameters
698        let mut metrics = self.metrics.write().unwrap();
699        metrics.images_decoded += 1;
700        Ok(())
701    }
702
703    fn load_font(&self, _font_id: ObjectId, _subset_only: bool) -> Result<(), String> {
704        // Implementation would load font data
705        Ok(())
706    }
707
708    fn load_annotation(&self, _annotation_id: ObjectId) -> Result<(), String> {
709        // Implementation would load annotation data
710        Ok(())
711    }
712
713    fn load_form_field(&self, _field_id: ObjectId) -> Result<(), String> {
714        // Implementation would load form field data
715        Ok(())
716    }
717
718    fn load_metadata(&self, _metadata_type: &str) -> Result<(), String> {
719        // Implementation would load metadata
720        Ok(())
721    }
722}
723
724impl Default for EnhancedLazyLoader {
725    fn default() -> Self {
726        Self::new(LazyLimits::default())
727    }
728}