Skip to main content

superbook_pdf/
reprocess.rs

1//! Partial Reprocessing module
2//!
3//! Provides functionality to reprocess only failed pages from a previous run,
4//! utilizing cached successful pages to speed up recovery.
5//!
6//! # Features
7//!
8//! - Track page processing status (Success/Failed/Pending)
9//! - Persist state to JSON for recovery
10//! - Reprocess only failed pages
11//! - Merge cached and reprocessed pages
12//!
13//! # Example
14//!
15//! ```rust,no_run
16//! use superbook_pdf::reprocess::{ReprocessState, ReprocessOptions, PageStatus};
17//! use std::path::Path;
18//!
19//! // Load existing state
20//! let mut state = ReprocessState::load(Path::new("output/.superbook-state.json")).unwrap();
21//!
22//! // Check failed pages
23//! println!("Failed pages: {:?}", state.failed_pages());
24//! println!("Completion: {:.1}%", state.completion_percent());
25//! ```
26
27use serde::{Deserialize, Serialize};
28use std::path::{Path, PathBuf};
29use thiserror::Error;
30
31// ============================================================
32// Error Types
33// ============================================================
34
35/// Reprocess error types
36#[derive(Debug, Error)]
37pub enum ReprocessError {
38    #[error("State file not found: {0}")]
39    StateNotFound(PathBuf),
40
41    #[error("Invalid state file: {0}")]
42    InvalidState(String),
43
44    #[error("Page index out of bounds: {0}")]
45    PageIndexOutOfBounds(usize),
46
47    #[error("No failed pages to reprocess")]
48    NoFailedPages,
49
50    #[error("IO error: {0}")]
51    IoError(#[from] std::io::Error),
52
53    #[error("JSON error: {0}")]
54    JsonError(#[from] serde_json::Error),
55}
56
57pub type Result<T> = std::result::Result<T, ReprocessError>;
58
59// ============================================================
60// Data Structures
61// ============================================================
62
63/// Page processing status
64#[derive(Debug, Clone, PartialEq, Serialize, Deserialize, Default)]
65pub enum PageStatus {
66    /// Successfully processed
67    Success {
68        /// Path to cached processed image
69        cached_path: PathBuf,
70        /// Processing time in seconds
71        processing_time: f64,
72    },
73    /// Failed with error
74    Failed {
75        /// Error message
76        error: String,
77        /// Number of retry attempts
78        retry_count: u32,
79    },
80    /// Not yet processed
81    #[default]
82    Pending,
83}
84
85impl PageStatus {
86    /// Check if status is Success
87    pub fn is_success(&self) -> bool {
88        matches!(self, PageStatus::Success { .. })
89    }
90
91    /// Check if status is Failed
92    pub fn is_failed(&self) -> bool {
93        matches!(self, PageStatus::Failed { .. })
94    }
95
96    /// Check if status is Pending
97    pub fn is_pending(&self) -> bool {
98        matches!(self, PageStatus::Pending)
99    }
100
101    /// Get retry count (0 for non-failed statuses)
102    pub fn retry_count(&self) -> u32 {
103        match self {
104            PageStatus::Failed { retry_count, .. } => *retry_count,
105            _ => 0,
106        }
107    }
108
109    /// Create a new Success status
110    pub fn success(cached_path: PathBuf, processing_time: f64) -> Self {
111        PageStatus::Success {
112            cached_path,
113            processing_time,
114        }
115    }
116
117    /// Create a new Failed status
118    pub fn failed(error: impl Into<String>) -> Self {
119        PageStatus::Failed {
120            error: error.into(),
121            retry_count: 0,
122        }
123    }
124
125    /// Increment retry count for Failed status
126    pub fn increment_retry(&mut self) {
127        if let PageStatus::Failed { retry_count, .. } = self {
128            *retry_count += 1;
129        }
130    }
131}
132
133/// Partial reprocessing state
134#[derive(Debug, Clone, Serialize, Deserialize)]
135pub struct ReprocessState {
136    /// Source PDF path
137    pub source_pdf: PathBuf,
138    /// Output directory
139    pub output_dir: PathBuf,
140    /// Page statuses (0-indexed)
141    pub pages: Vec<PageStatus>,
142    /// Processing configuration hash (for cache invalidation)
143    pub config_hash: String,
144    /// Creation timestamp (ISO 8601)
145    pub created_at: String,
146    /// Last updated timestamp (ISO 8601)
147    pub updated_at: String,
148}
149
150impl ReprocessState {
151    /// Create a new ReprocessState for a PDF
152    pub fn new(source_pdf: PathBuf, output_dir: PathBuf, page_count: usize, config_hash: String) -> Self {
153        let now = chrono_now();
154        Self {
155            source_pdf,
156            output_dir,
157            pages: vec![PageStatus::Pending; page_count],
158            config_hash,
159            created_at: now.clone(),
160            updated_at: now,
161        }
162    }
163
164    /// Load state from JSON file
165    pub fn load(path: &Path) -> Result<Self> {
166        if !path.exists() {
167            return Err(ReprocessError::StateNotFound(path.to_path_buf()));
168        }
169        let content = std::fs::read_to_string(path)?;
170        let state: Self = serde_json::from_str(&content)?;
171        Ok(state)
172    }
173
174    /// Save state to JSON file
175    pub fn save(&self, path: &Path) -> Result<()> {
176        let mut state = self.clone();
177        state.updated_at = chrono_now();
178        let content = serde_json::to_string_pretty(&state)?;
179        std::fs::write(path, content)?;
180        Ok(())
181    }
182
183    /// Get indices of failed pages
184    pub fn failed_pages(&self) -> Vec<usize> {
185        self.pages
186            .iter()
187            .enumerate()
188            .filter_map(|(i, s)| if s.is_failed() { Some(i) } else { None })
189            .collect()
190    }
191
192    /// Get indices of successful pages
193    pub fn success_pages(&self) -> Vec<usize> {
194        self.pages
195            .iter()
196            .enumerate()
197            .filter_map(|(i, s)| if s.is_success() { Some(i) } else { None })
198            .collect()
199    }
200
201    /// Get indices of pending pages
202    pub fn pending_pages(&self) -> Vec<usize> {
203        self.pages
204            .iter()
205            .enumerate()
206            .filter_map(|(i, s)| if s.is_pending() { Some(i) } else { None })
207            .collect()
208    }
209
210    /// Check if all pages are successfully processed
211    pub fn is_complete(&self) -> bool {
212        self.pages.iter().all(|s| s.is_success())
213    }
214
215    /// Get completion percentage (0.0 - 100.0)
216    pub fn completion_percent(&self) -> f64 {
217        if self.pages.is_empty() {
218            return 100.0;
219        }
220        let success_count = self.pages.iter().filter(|s| s.is_success()).count();
221        (success_count as f64 / self.pages.len() as f64) * 100.0
222    }
223
224    /// Get total page count
225    pub fn page_count(&self) -> usize {
226        self.pages.len()
227    }
228
229    /// Mark a page as successful
230    pub fn mark_success(&mut self, page_idx: usize, cached_path: PathBuf, processing_time: f64) -> Result<()> {
231        if page_idx >= self.pages.len() {
232            return Err(ReprocessError::PageIndexOutOfBounds(page_idx));
233        }
234        self.pages[page_idx] = PageStatus::success(cached_path, processing_time);
235        self.updated_at = chrono_now();
236        Ok(())
237    }
238
239    /// Mark a page as failed
240    pub fn mark_failed(&mut self, page_idx: usize, error: impl Into<String>) -> Result<()> {
241        if page_idx >= self.pages.len() {
242            return Err(ReprocessError::PageIndexOutOfBounds(page_idx));
243        }
244        let retry_count = self.pages[page_idx].retry_count();
245        self.pages[page_idx] = PageStatus::Failed {
246            error: error.into(),
247            retry_count,
248        };
249        self.updated_at = chrono_now();
250        Ok(())
251    }
252
253    /// Increment retry count for a page
254    pub fn increment_retry(&mut self, page_idx: usize) -> Result<()> {
255        if page_idx >= self.pages.len() {
256            return Err(ReprocessError::PageIndexOutOfBounds(page_idx));
257        }
258        self.pages[page_idx].increment_retry();
259        self.updated_at = chrono_now();
260        Ok(())
261    }
262
263    /// Get cached paths for all successful pages
264    pub fn cached_paths(&self) -> Vec<Option<PathBuf>> {
265        self.pages
266            .iter()
267            .map(|s| match s {
268                PageStatus::Success { cached_path, .. } => Some(cached_path.clone()),
269                _ => None,
270            })
271            .collect()
272    }
273
274    /// Check if config has changed (requires reprocessing)
275    pub fn config_changed(&self, new_hash: &str) -> bool {
276        self.config_hash != new_hash
277    }
278
279    /// Invalidate all pages (mark as Pending)
280    pub fn invalidate_all(&mut self) {
281        for status in &mut self.pages {
282            *status = PageStatus::Pending;
283        }
284        self.updated_at = chrono_now();
285    }
286}
287
288impl Default for ReprocessState {
289    fn default() -> Self {
290        Self {
291            source_pdf: PathBuf::new(),
292            output_dir: PathBuf::new(),
293            pages: vec![],
294            config_hash: String::new(),
295            created_at: chrono_now(),
296            updated_at: chrono_now(),
297        }
298    }
299}
300
301/// Reprocess options
302#[derive(Debug, Clone)]
303pub struct ReprocessOptions {
304    /// Maximum retry attempts per page
305    pub max_retries: u32,
306    /// Retry only specific pages (empty = all failed)
307    pub page_indices: Vec<usize>,
308    /// Force reprocess even if cached
309    pub force: bool,
310    /// Preserve intermediate files
311    pub keep_intermediates: bool,
312}
313
314impl Default for ReprocessOptions {
315    fn default() -> Self {
316        Self {
317            max_retries: 3,
318            page_indices: vec![],
319            force: false,
320            keep_intermediates: false,
321        }
322    }
323}
324
325impl ReprocessOptions {
326    /// Create options to reprocess all failed pages
327    pub fn all_failed() -> Self {
328        Self::default()
329    }
330
331    /// Create options to reprocess specific pages
332    pub fn specific_pages(pages: Vec<usize>) -> Self {
333        Self {
334            page_indices: pages,
335            ..Default::default()
336        }
337    }
338
339    /// Builder: set max retries
340    pub fn with_max_retries(mut self, max_retries: u32) -> Self {
341        self.max_retries = max_retries;
342        self
343    }
344
345    /// Builder: set force flag
346    pub fn with_force(mut self, force: bool) -> Self {
347        self.force = force;
348        self
349    }
350
351    /// Builder: set keep intermediates flag
352    pub fn with_keep_intermediates(mut self, keep: bool) -> Self {
353        self.keep_intermediates = keep;
354        self
355    }
356}
357
358/// Reprocess result
359#[derive(Debug, Clone)]
360pub struct ReprocessResult {
361    /// Total pages in document
362    pub total_pages: usize,
363    /// Pages successfully processed
364    pub success_count: usize,
365    /// Pages still failing
366    pub failed_count: usize,
367    /// Pages reprocessed this run
368    pub reprocessed_count: usize,
369    /// Final output path (if complete)
370    pub output_path: Option<PathBuf>,
371    /// Remaining failed page indices
372    pub failed_pages: Vec<usize>,
373}
374
375impl ReprocessResult {
376    /// Check if processing is complete
377    pub fn is_complete(&self) -> bool {
378        self.failed_count == 0
379    }
380
381    /// Get completion percentage
382    pub fn completion_percent(&self) -> f64 {
383        if self.total_pages == 0 {
384            return 100.0;
385        }
386        (self.success_count as f64 / self.total_pages as f64) * 100.0
387    }
388}
389
390// ============================================================
391// Helper Functions
392// ============================================================
393
394/// Get current timestamp in ISO 8601 format
395fn chrono_now() -> String {
396    use std::time::SystemTime;
397    let now = SystemTime::now()
398        .duration_since(SystemTime::UNIX_EPOCH)
399        .unwrap_or_default();
400    format!("{}Z", now.as_secs())
401}
402
403/// Calculate config hash from pipeline config
404pub fn calculate_config_hash(config: &crate::PipelineConfig) -> String {
405    use std::collections::hash_map::DefaultHasher;
406    use std::hash::{Hash, Hasher};
407
408    let mut hasher = DefaultHasher::new();
409    format!("{:?}", config).hash(&mut hasher);
410    format!("{:016x}", hasher.finish())
411}
412
413// ============================================================
414// Tests
415// ============================================================
416
417#[cfg(test)]
418mod tests {
419    use super::*;
420
421    #[test]
422    fn test_page_status_default() {
423        let status = PageStatus::default();
424        assert!(status.is_pending());
425    }
426
427    #[test]
428    fn test_page_status_success() {
429        let status = PageStatus::success(PathBuf::from("test.png"), 1.5);
430        assert!(status.is_success());
431        assert!(!status.is_failed());
432        assert!(!status.is_pending());
433    }
434
435    #[test]
436    fn test_page_status_failed() {
437        let status = PageStatus::failed("test error");
438        assert!(status.is_failed());
439        assert_eq!(status.retry_count(), 0);
440    }
441
442    #[test]
443    fn test_page_status_increment_retry() {
444        let mut status = PageStatus::failed("error");
445        status.increment_retry();
446        assert_eq!(status.retry_count(), 1);
447        status.increment_retry();
448        assert_eq!(status.retry_count(), 2);
449    }
450
451    #[test]
452    fn test_reprocess_state_new() {
453        let state = ReprocessState::new(
454            PathBuf::from("test.pdf"),
455            PathBuf::from("output"),
456            5,
457            "hash123".into(),
458        );
459        assert_eq!(state.page_count(), 5);
460        assert_eq!(state.completion_percent(), 0.0);
461        assert!(state.pending_pages().len() == 5);
462    }
463
464    #[test]
465    fn test_reprocess_state_failed_pages() {
466        let mut state = ReprocessState::new(
467            PathBuf::from("test.pdf"),
468            PathBuf::from("output"),
469            5,
470            "hash".into(),
471        );
472        state.pages[0] = PageStatus::success(PathBuf::new(), 0.0);
473        state.pages[1] = PageStatus::failed("error1");
474        state.pages[2] = PageStatus::success(PathBuf::new(), 0.0);
475        state.pages[3] = PageStatus::failed("error2");
476        state.pages[4] = PageStatus::Pending;
477
478        let failed = state.failed_pages();
479        assert_eq!(failed, vec![1, 3]);
480    }
481
482    #[test]
483    fn test_reprocess_state_completion_percent() {
484        let mut state = ReprocessState::new(
485            PathBuf::from("test.pdf"),
486            PathBuf::from("output"),
487            4,
488            "hash".into(),
489        );
490        state.pages[0] = PageStatus::success(PathBuf::new(), 0.0);
491        state.pages[1] = PageStatus::success(PathBuf::new(), 0.0);
492        state.pages[2] = PageStatus::failed("error");
493        state.pages[3] = PageStatus::Pending;
494
495        assert!((state.completion_percent() - 50.0).abs() < 0.01);
496    }
497
498    #[test]
499    fn test_reprocess_state_is_complete() {
500        let mut state = ReprocessState::new(
501            PathBuf::from("test.pdf"),
502            PathBuf::from("output"),
503            2,
504            "hash".into(),
505        );
506        assert!(!state.is_complete());
507
508        state.pages[0] = PageStatus::success(PathBuf::new(), 0.0);
509        assert!(!state.is_complete());
510
511        state.pages[1] = PageStatus::success(PathBuf::new(), 0.0);
512        assert!(state.is_complete());
513    }
514
515    #[test]
516    fn test_reprocess_state_mark_success() {
517        let mut state = ReprocessState::new(
518            PathBuf::from("test.pdf"),
519            PathBuf::from("output"),
520            3,
521            "hash".into(),
522        );
523
524        state.mark_success(1, PathBuf::from("cached.png"), 2.5).unwrap();
525
526        assert!(state.pages[1].is_success());
527        if let PageStatus::Success { cached_path, processing_time } = &state.pages[1] {
528            assert_eq!(cached_path.to_str().unwrap(), "cached.png");
529            assert!((processing_time - 2.5).abs() < 0.01);
530        }
531    }
532
533    #[test]
534    fn test_reprocess_state_mark_failed() {
535        let mut state = ReprocessState::new(
536            PathBuf::from("test.pdf"),
537            PathBuf::from("output"),
538            3,
539            "hash".into(),
540        );
541
542        state.mark_failed(0, "test error").unwrap();
543
544        assert!(state.pages[0].is_failed());
545        if let PageStatus::Failed { error, retry_count } = &state.pages[0] {
546            assert_eq!(error, "test error");
547            assert_eq!(*retry_count, 0);
548        }
549    }
550
551    #[test]
552    fn test_reprocess_state_page_index_out_of_bounds() {
553        let mut state = ReprocessState::new(
554            PathBuf::from("test.pdf"),
555            PathBuf::from("output"),
556            3,
557            "hash".into(),
558        );
559
560        let result = state.mark_success(10, PathBuf::new(), 0.0);
561        assert!(matches!(result, Err(ReprocessError::PageIndexOutOfBounds(10))));
562    }
563
564    #[test]
565    fn test_reprocess_state_save_load() {
566        let temp_dir = tempfile::tempdir().unwrap();
567        let state_path = temp_dir.path().join("state.json");
568
569        let mut state = ReprocessState::new(
570            PathBuf::from("test.pdf"),
571            PathBuf::from("output"),
572            3,
573            "testhash".into(),
574        );
575        state.pages[0] = PageStatus::success(PathBuf::from("p0.png"), 1.0);
576        state.pages[1] = PageStatus::failed("error");
577        state.pages[2] = PageStatus::Pending;
578
579        state.save(&state_path).unwrap();
580        let loaded = ReprocessState::load(&state_path).unwrap();
581
582        assert_eq!(loaded.source_pdf, state.source_pdf);
583        assert_eq!(loaded.config_hash, state.config_hash);
584        assert_eq!(loaded.pages.len(), 3);
585        assert!(loaded.pages[0].is_success());
586        assert!(loaded.pages[1].is_failed());
587        assert!(loaded.pages[2].is_pending());
588    }
589
590    #[test]
591    fn test_reprocess_state_config_changed() {
592        let state = ReprocessState::new(
593            PathBuf::from("test.pdf"),
594            PathBuf::from("output"),
595            1,
596            "hash_v1".into(),
597        );
598
599        assert!(!state.config_changed("hash_v1"));
600        assert!(state.config_changed("hash_v2"));
601    }
602
603    #[test]
604    fn test_reprocess_state_invalidate_all() {
605        let mut state = ReprocessState::new(
606            PathBuf::from("test.pdf"),
607            PathBuf::from("output"),
608            3,
609            "hash".into(),
610        );
611        state.pages[0] = PageStatus::success(PathBuf::new(), 0.0);
612        state.pages[1] = PageStatus::failed("error");
613
614        state.invalidate_all();
615
616        assert!(state.pages.iter().all(|s| s.is_pending()));
617    }
618
619    #[test]
620    fn test_reprocess_options_default() {
621        let opts = ReprocessOptions::default();
622        assert_eq!(opts.max_retries, 3);
623        assert!(opts.page_indices.is_empty());
624        assert!(!opts.force);
625    }
626
627    #[test]
628    fn test_reprocess_options_specific_pages() {
629        let opts = ReprocessOptions::specific_pages(vec![1, 3, 5]);
630        assert_eq!(opts.page_indices, vec![1, 3, 5]);
631    }
632
633    #[test]
634    fn test_reprocess_options_builder() {
635        let opts = ReprocessOptions::all_failed()
636            .with_max_retries(5)
637            .with_force(true)
638            .with_keep_intermediates(true);
639
640        assert_eq!(opts.max_retries, 5);
641        assert!(opts.force);
642        assert!(opts.keep_intermediates);
643    }
644
645    #[test]
646    fn test_reprocess_result_is_complete() {
647        let result = ReprocessResult {
648            total_pages: 10,
649            success_count: 10,
650            failed_count: 0,
651            reprocessed_count: 2,
652            output_path: Some(PathBuf::from("output.pdf")),
653            failed_pages: vec![],
654        };
655        assert!(result.is_complete());
656
657        let result_incomplete = ReprocessResult {
658            total_pages: 10,
659            success_count: 8,
660            failed_count: 2,
661            reprocessed_count: 2,
662            output_path: None,
663            failed_pages: vec![3, 7],
664        };
665        assert!(!result_incomplete.is_complete());
666    }
667
668    #[test]
669    fn test_reprocess_result_completion_percent() {
670        let result = ReprocessResult {
671            total_pages: 10,
672            success_count: 7,
673            failed_count: 3,
674            reprocessed_count: 0,
675            output_path: None,
676            failed_pages: vec![],
677        };
678        assert!((result.completion_percent() - 70.0).abs() < 0.01);
679    }
680
681    #[test]
682    fn test_reprocess_state_cached_paths() {
683        let mut state = ReprocessState::new(
684            PathBuf::from("test.pdf"),
685            PathBuf::from("output"),
686            3,
687            "hash".into(),
688        );
689        state.pages[0] = PageStatus::success(PathBuf::from("p0.png"), 0.0);
690        state.pages[1] = PageStatus::failed("error");
691        state.pages[2] = PageStatus::success(PathBuf::from("p2.png"), 0.0);
692
693        let cached = state.cached_paths();
694        assert_eq!(cached.len(), 3);
695        assert_eq!(cached[0], Some(PathBuf::from("p0.png")));
696        assert_eq!(cached[1], None);
697        assert_eq!(cached[2], Some(PathBuf::from("p2.png")));
698    }
699
700    #[test]
701    fn test_page_status_serialization() {
702        let success = PageStatus::success(PathBuf::from("test.png"), 1.5);
703        let json = serde_json::to_string(&success).unwrap();
704        let deserialized: PageStatus = serde_json::from_str(&json).unwrap();
705        assert!(deserialized.is_success());
706    }
707
708    #[test]
709    fn test_reprocess_state_empty() {
710        let state = ReprocessState::new(
711            PathBuf::from("test.pdf"),
712            PathBuf::from("output"),
713            0,
714            "hash".into(),
715        );
716        assert!(state.is_complete());
717        assert_eq!(state.completion_percent(), 100.0);
718    }
719
720    #[test]
721    fn test_increment_retry_preserves_error() {
722        let mut state = ReprocessState::new(
723            PathBuf::from("test.pdf"),
724            PathBuf::from("output"),
725            1,
726            "hash".into(),
727        );
728        state.pages[0] = PageStatus::Failed {
729            error: "original error".into(),
730            retry_count: 0,
731        };
732
733        state.increment_retry(0).unwrap();
734
735        if let PageStatus::Failed { error, retry_count } = &state.pages[0] {
736            assert_eq!(error, "original error");
737            assert_eq!(*retry_count, 1);
738        }
739    }
740}