Skip to main content

bookforge_store/
db.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    fs,
5    path::{Path, PathBuf},
6    time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use bookforge_core::{
10    Result as CoreResult,
11    ir::BlockId,
12    run_snapshot::RunConfigSnapshot,
13    segment::{BlockTranslation, Segment},
14};
15use rusqlite::{Connection, OptionalExtension, params};
16use sha2::{Digest, Sha256};
17
18pub type Result<T> = std::result::Result<T, StoreError>;
19
20#[derive(Debug, thiserror::Error)]
21pub enum StoreError {
22    #[error("I/O error: {0}")]
23    Io(#[from] std::io::Error),
24
25    #[error("SQLite error: {0}")]
26    Sqlite(#[from] rusqlite::Error),
27
28    #[error("core error: {0}")]
29    Core(#[from] bookforge_core::BookforgeError),
30
31    #[error("serialization error: {0}")]
32    Serialization(String),
33}
34
35pub struct JobStore {
36    conn: RefCell<Connection>,
37    path: PathBuf,
38}
39
40#[derive(Debug, Clone)]
41pub struct JobRecord {
42    pub id: String,
43    pub input_path: PathBuf,
44    pub input_snapshot_path: Option<PathBuf>,
45    pub input_sha256: Option<String>,
46    pub output_path: PathBuf,
47    pub input_hash: String,
48    pub source_lang: Option<String>,
49    pub target_lang: String,
50    pub provider: String,
51    pub model: String,
52    pub base_url: Option<String>,
53    pub api_key_env: Option<String>,
54    pub status: String,
55    pub events_path: Option<PathBuf>,
56    pub report_json_path: Option<PathBuf>,
57    pub report_markdown_path: Option<PathBuf>,
58}
59
60#[derive(Debug, Clone, Default)]
61pub struct JobSummary {
62    pub id: String,
63    pub status: String,
64    pub total_segments: usize,
65    pub succeeded: usize,
66    pub failed: usize,
67    pub needs_review: usize,
68    pub retry_pending: usize,
69    pub cached: usize,
70    pub retried: usize,
71    pub input_tokens: u64,
72    pub input_cached_tokens: u64,
73    pub output_tokens: u64,
74}
75
76#[derive(Debug, Clone, Copy)]
77pub struct CreateJob<'a> {
78    pub input: &'a Path,
79    pub output: &'a Path,
80    pub source_lang: Option<&'a str>,
81    pub target_lang: &'a str,
82    pub provider: &'a str,
83    pub model: &'a str,
84    pub base_url: Option<&'a str>,
85    pub api_key_env: Option<&'a str>,
86}
87
88#[derive(Debug, Clone)]
89pub struct SegmentRecord {
90    pub id: String,
91    pub status: String,
92    pub attempts: usize,
93    pub error: Option<String>,
94    pub input_tokens: Option<u64>,
95    pub input_cached_tokens: Option<u64>,
96    pub output_tokens: Option<u64>,
97    pub tokens_estimated: bool,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct StoredBlockTranslation {
102    pub segment_id: String,
103    pub block_id: String,
104    pub text: String,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct StoredSegmentTranslation {
109    pub segment_id: String,
110    pub ordinal: usize,
111    pub status: String,
112    pub error: Option<String>,
113    pub translated_text: String,
114    pub blocks: Vec<BlockTranslation>,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct CachedTranslation {
119    pub translated_text: String,
120    pub blocks: Vec<BlockTranslation>,
121}
122
123#[derive(Debug, Clone, Copy)]
124pub struct SaveTranslation<'a> {
125    pub job_id: &'a str,
126    pub segment_id: &'a str,
127    pub translated_text: &'a str,
128    pub blocks: &'a [BlockTranslation],
129    pub provider: &'a str,
130    pub model: &'a str,
131    pub prompt_version: &'a str,
132    pub input_tokens: Option<u64>,
133    pub input_cached_tokens: Option<u64>,
134    pub output_tokens: Option<u64>,
135    pub tokens_estimated: bool,
136}
137
138#[derive(Debug, Clone, Copy)]
139pub struct SaveNeedsReview<'a> {
140    pub job_id: &'a str,
141    pub segment_id: &'a str,
142    pub preserved_text: &'a str,
143    pub blocks: &'a [BlockTranslation],
144    pub provider: &'a str,
145    pub model: &'a str,
146    pub prompt_version: &'a str,
147    pub error: &'a str,
148    pub input_tokens: Option<u64>,
149    pub input_cached_tokens: Option<u64>,
150    pub output_tokens: Option<u64>,
151    pub tokens_estimated: bool,
152}
153
154#[derive(Debug, Clone, Copy)]
155pub struct SaveCachedTranslation<'a> {
156    pub job_id: &'a str,
157    pub segment_id: &'a str,
158    pub translated_text: &'a str,
159    pub blocks: &'a [BlockTranslation],
160    pub provider: &'a str,
161    pub model: &'a str,
162    pub prompt_version: &'a str,
163}
164
165#[derive(Debug, Clone, Copy)]
166pub struct CacheLookupRequest<'a> {
167    pub prompt_version: &'a str,
168    pub provider: &'a str,
169    pub model: &'a str,
170    pub source_lang: Option<&'a str>,
171    pub target_lang: &'a str,
172    pub cache_namespace: &'a str,
173}
174
175#[derive(Debug, Clone, Copy)]
176pub struct NewSegmentFlag<'a> {
177    pub job_id: &'a str,
178    pub segment_id: &'a str,
179    pub kind: &'a str,
180    pub note: Option<&'a str>,
181    pub suggested_source: Option<&'a str>,
182    pub suggested_target: Option<&'a str>,
183    pub consumed: bool,
184}
185
186#[derive(Debug, Clone)]
187pub struct StorageDoctor {
188    pub database_path: PathBuf,
189    pub database_exists: bool,
190    pub wal_present: bool,
191    pub shm_present: bool,
192    pub journal_mode: String,
193    pub integrity_check: String,
194    pub wal_sidecars_normal: bool,
195    pub note: String,
196}
197
198pub fn run_doctor(db_path: Option<PathBuf>) -> Result<StorageDoctor> {
199    let path = db_path.unwrap_or_else(|| PathBuf::from(".bookforge/jobs.sqlite"));
200    let database_exists = path.exists();
201    let wal_path = path.with_extension("sqlite-wal");
202    let shm_path = path.with_extension("sqlite-shm");
203    let wal_present = wal_path.exists();
204    let shm_present = shm_path.exists();
205
206    let (journal_mode, integrity_check, wal_sidecars_normal, note) = if database_exists {
207        let conn = Connection::open(&path)?;
208        let journal_mode: String = conn
209            .pragma_query_value(None, "journal_mode", |row| row.get(0))
210            .unwrap_or_else(|_| "unknown".to_string());
211        let integrity_check: String = conn
212            .pragma_query_value(None, "integrity_check", |row| row.get(0))
213            .unwrap_or_else(|_| "error".to_string());
214        let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
215
216        let wal_sidecars_normal = if wal_present || shm_present {
217            integrity_check == "ok"
218        } else {
219            true
220        };
221
222        let note = if wal_present || shm_present {
223            "WAL sidecar files are normal. SQLite will recover them automatically. \
224             Do not delete them manually while BookForge is running."
225                .to_string()
226        } else {
227            String::new()
228        };
229
230        (journal_mode, integrity_check, wal_sidecars_normal, note)
231    } else {
232        ("unknown".to_string(), String::new(), true, String::new())
233    };
234
235    Ok(StorageDoctor {
236        database_path: path,
237        database_exists,
238        wal_present,
239        shm_present,
240        journal_mode,
241        integrity_check,
242        wal_sidecars_normal,
243        note,
244    })
245}
246
247impl JobStore {
248    pub fn open_default() -> Result<Self> {
249        Self::open(".bookforge/jobs.sqlite")
250    }
251
252    pub fn open(path: impl Into<PathBuf>) -> Result<Self> {
253        let path = path.into();
254        if let Some(parent) = path.parent() {
255            fs::create_dir_all(parent)?;
256        }
257        let conn = Connection::open(&path)?;
258
259        conn.busy_timeout(Duration::from_secs(5))?;
260        conn.pragma_update(None, "journal_mode", "WAL")?;
261        conn.pragma_update(None, "synchronous", "NORMAL")?;
262        conn.pragma_update(None, "foreign_keys", "ON")?;
263
264        let store = Self {
265            conn: RefCell::new(conn),
266            path,
267        };
268        store.migrate()?;
269        Ok(store)
270    }
271
272    pub fn path(&self) -> &Path {
273        &self.path
274    }
275
276    pub fn create_job(&self, request: CreateJob<'_>) -> Result<JobRecord> {
277        let input_hash = file_hash(request.input)?;
278        let id = format!("job_{}_{}", unix_timestamp_nanos(), &input_hash[..12]);
279        let now = timestamp_string();
280        let input_path = request.input.to_path_buf();
281        let output_path = request.output.to_path_buf();
282        let conn = self.conn.borrow();
283        conn.execute(
284            "INSERT INTO jobs
285             (id, input_path, output_path, input_hash, source_lang, target_lang, provider, model, base_url, api_key_env, status, created_at, updated_at)
286             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, 'running', ?11, ?11)",
287            params![
288                id,
289                input_path.to_string_lossy(),
290                output_path.to_string_lossy(),
291                input_hash,
292                request.source_lang,
293                request.target_lang,
294                request.provider,
295                request.model,
296                request.base_url,
297                request.api_key_env,
298                now,
299            ],
300        )?;
301
302        Ok(JobRecord {
303            id,
304            input_path,
305            input_snapshot_path: None,
306            input_sha256: None,
307            output_path,
308            input_hash,
309            source_lang: request.source_lang.map(ToOwned::to_owned),
310            target_lang: request.target_lang.to_string(),
311            provider: request.provider.to_string(),
312            model: request.model.to_string(),
313            base_url: request.base_url.map(ToOwned::to_owned),
314            api_key_env: request.api_key_env.map(ToOwned::to_owned),
315            status: "running".to_string(),
316            events_path: None,
317            report_json_path: None,
318            report_markdown_path: None,
319        })
320    }
321
322    pub fn update_job_config_snapshot(
323        &self,
324        job_id: &str,
325        snapshot: &RunConfigSnapshot,
326    ) -> Result<()> {
327        let json = serde_json::to_string(snapshot)
328            .map_err(|e| StoreError::Serialization(e.to_string()))?;
329        let conn = self.conn.borrow();
330        conn.execute(
331            "UPDATE jobs
332             SET config_json = ?1,
333                 events_path = ?2,
334                 report_json_path = ?3,
335                 report_markdown_path = ?4,
336                 input_snapshot_path = ?5,
337                 input_sha256 = ?6,
338                 updated_at = ?7
339             WHERE id = ?8",
340            params![
341                json,
342                snapshot
343                    .events_path
344                    .as_ref()
345                    .map(|path| path.to_string_lossy().to_string()),
346                snapshot
347                    .report_json_path
348                    .as_ref()
349                    .map(|path| path.to_string_lossy().to_string()),
350                snapshot
351                    .report_markdown_path
352                    .as_ref()
353                    .map(|path| path.to_string_lossy().to_string()),
354                snapshot
355                    .input_snapshot_path
356                    .as_ref()
357                    .map(|path| path.to_string_lossy().to_string()),
358                snapshot.input_sha256.as_deref(),
359                timestamp_string(),
360                job_id,
361            ],
362        )?;
363        Ok(())
364    }
365
366    pub fn update_job_input_snapshot(
367        &self,
368        job_id: &str,
369        snapshot_path: &Path,
370        input_sha256: &str,
371    ) -> Result<()> {
372        let conn = self.conn.borrow();
373        conn.execute(
374            "UPDATE jobs
375             SET input_snapshot_path = ?1,
376                 input_sha256 = ?2,
377                 updated_at = ?3
378             WHERE id = ?4",
379            params![
380                snapshot_path.to_string_lossy(),
381                input_sha256,
382                timestamp_string(),
383                job_id
384            ],
385        )?;
386        Ok(())
387    }
388
389    pub fn load_job_config_snapshot(&self, job_id: &str) -> Result<Option<RunConfigSnapshot>> {
390        let conn = self.conn.borrow();
391        let Some(json) = conn
392            .query_row(
393                "SELECT config_json FROM jobs WHERE id = ?1",
394                params![job_id],
395                |row| row.get::<_, Option<String>>(0),
396            )
397            .optional()?
398            .flatten()
399        else {
400            return Ok(None);
401        };
402
403        serde_json::from_str(&json)
404            .map(Some)
405            .map_err(|e| StoreError::Serialization(e.to_string()))
406    }
407
408    pub fn update_job_event_path(&self, job_id: &str, path: &Path) -> Result<()> {
409        let conn = self.conn.borrow();
410        conn.execute(
411            "UPDATE jobs SET events_path = ?1, updated_at = ?2 WHERE id = ?3",
412            params![path.to_string_lossy(), timestamp_string(), job_id],
413        )?;
414        Ok(())
415    }
416
417    pub fn update_job_report_paths(
418        &self,
419        job_id: &str,
420        json_path: &Path,
421        markdown_path: &Path,
422    ) -> Result<()> {
423        let conn = self.conn.borrow();
424        conn.execute(
425            "UPDATE jobs
426             SET report_json_path = ?1, report_markdown_path = ?2, updated_at = ?3
427             WHERE id = ?4",
428            params![
429                json_path.to_string_lossy(),
430                markdown_path.to_string_lossy(),
431                timestamp_string(),
432                job_id
433            ],
434        )?;
435        Ok(())
436    }
437
438    pub fn update_job_output_path(&self, job_id: &str, path: &Path) -> Result<()> {
439        let conn = self.conn.borrow();
440        conn.execute(
441            "UPDATE jobs SET output_path = ?1, updated_at = ?2 WHERE id = ?3",
442            params![path.to_string_lossy(), timestamp_string(), job_id],
443        )?;
444        Ok(())
445    }
446
447    pub fn insert_segments(
448        &self,
449        job_id: &str,
450        segments: &[Segment],
451        prompt_version: &str,
452        provider: &str,
453        model: &str,
454        cache_namespace: &str,
455    ) -> Result<()> {
456        let mut conn = self.conn.borrow_mut();
457        let tx = conn.transaction()?;
458        for segment in segments {
459            tx.execute(
460                "INSERT OR IGNORE INTO segments
461                 (id, job_id, section_id, ordinal, source_hash, prompt_version, provider, model, status, attempts, cache_namespace)
462                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'queued', 0, ?9)",
463                params![
464                    segment.id.0,
465                    job_id,
466                    segment.section_id.0,
467                    segment.ordinal as i64,
468                    segment.checksum,
469                    prompt_version,
470                    provider,
471                    model,
472                    cache_namespace,
473                ],
474            )?;
475        }
476        tx.commit()?;
477        Ok(())
478    }
479
480    pub fn save_translation(&self, request: SaveTranslation<'_>) -> Result<()> {
481        let now = timestamp_string();
482        let translated_hash = stable_hash(request.translated_text);
483        {
484            let conn = self.conn.borrow();
485            conn.execute(
486                "INSERT OR REPLACE INTO translations
487                 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
488                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
489                params![
490                    request.segment_id,
491                    request.job_id,
492                    request.translated_text,
493                    request.provider,
494                    request.model,
495                    request.prompt_version,
496                    now
497                ],
498            )?;
499            replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
500            conn.execute(
501                "UPDATE segments
502                 SET status = 'succeeded',
503                     attempts = attempts + 1,
504                     tokens_input = ?1,
505                     tokens_input_cached = ?2,
506                     tokens_output = ?3,
507                     tokens_estimated = ?4,
508                     translated_hash = ?5,
509                     error = NULL
510                 WHERE job_id = ?6 AND id = ?7",
511                params![
512                    request.input_tokens.map(|value| value as i64),
513                    request.input_cached_tokens.map(|value| value as i64),
514                    request.output_tokens.map(|value| value as i64),
515                    if request.tokens_estimated {
516                        1_i64
517                    } else {
518                        0_i64
519                    },
520                    translated_hash,
521                    request.job_id,
522                    request.segment_id,
523                ],
524            )?;
525        }
526        self.touch_job(request.job_id, "running")?;
527        Ok(())
528    }
529
530    pub fn save_needs_review(&self, request: SaveNeedsReview<'_>) -> Result<()> {
531        let now = timestamp_string();
532        let translated_hash = stable_hash(request.preserved_text);
533        {
534            let conn = self.conn.borrow();
535            conn.execute(
536                "INSERT OR REPLACE INTO translations
537                 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
538                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
539                params![
540                    request.segment_id,
541                    request.job_id,
542                    request.preserved_text,
543                    request.provider,
544                    request.model,
545                    request.prompt_version,
546                    now
547                ],
548            )?;
549            replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
550            conn.execute(
551                "UPDATE segments
552                 SET status = 'needs_review',
553                     attempts = attempts + 1,
554                     tokens_input = ?1,
555                     tokens_input_cached = ?2,
556                     tokens_output = ?3,
557                     tokens_estimated = ?4,
558                     translated_hash = ?5,
559                     error = ?6
560                 WHERE job_id = ?7 AND id = ?8",
561                params![
562                    request.input_tokens.map(|value| value as i64),
563                    request.input_cached_tokens.map(|value| value as i64),
564                    request.output_tokens.map(|value| value as i64),
565                    if request.tokens_estimated {
566                        1_i64
567                    } else {
568                        0_i64
569                    },
570                    translated_hash,
571                    request.error,
572                    request.job_id,
573                    request.segment_id
574                ],
575            )?;
576        }
577        self.touch_job(request.job_id, "needs_review")?;
578        Ok(())
579    }
580
581    pub fn save_cached_translation(&self, request: SaveCachedTranslation<'_>) -> Result<()> {
582        let now = timestamp_string();
583        let translated_hash = stable_hash(request.translated_text);
584        {
585            let conn = self.conn.borrow();
586            conn.execute(
587                "INSERT OR REPLACE INTO translations
588                 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
589                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
590                params![
591                    request.segment_id,
592                    request.job_id,
593                    request.translated_text,
594                    request.provider,
595                    request.model,
596                    request.prompt_version,
597                    now
598                ],
599            )?;
600            replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
601            conn.execute(
602                "UPDATE segments
603                 SET status = 'skipped_cached',
604                     tokens_input = NULL,
605                     tokens_input_cached = NULL,
606                     tokens_output = NULL,
607                     tokens_estimated = 0,
608                     translated_hash = ?1,
609                     error = NULL
610                 WHERE job_id = ?2 AND id = ?3",
611                params![translated_hash, request.job_id, request.segment_id],
612            )?;
613        }
614        self.touch_job(request.job_id, "running")?;
615        Ok(())
616    }
617
618    pub fn mark_job_complete(&self, job_id: &str) -> Result<()> {
619        self.touch_job(job_id, "succeeded")
620    }
621
622    pub fn mark_job_running(&self, job_id: &str) -> Result<()> {
623        self.touch_job(job_id, "running")
624    }
625
626    pub fn mark_job_succeeded(&self, job_id: &str) -> Result<()> {
627        self.mark_job_complete(job_id)
628    }
629
630    pub fn mark_job_needs_review(&self, job_id: &str) -> Result<()> {
631        self.touch_job(job_id, "needs_review")
632    }
633
634    pub fn mark_job_interrupted(&self, job_id: &str) -> Result<()> {
635        self.touch_job(job_id, "interrupted")
636    }
637
638    pub fn mark_job_failed(&self, job_id: &str) -> Result<()> {
639        self.touch_job(job_id, "failed")
640    }
641
642    pub fn mark_segment_failed(&self, job_id: &str, segment_id: &str, error: &str) -> Result<()> {
643        {
644            let conn = self.conn.borrow();
645            conn.execute(
646                "UPDATE segments SET status = 'failed', attempts = attempts + 1, error = ?1 WHERE job_id = ?2 AND id = ?3",
647                params![error, job_id, segment_id],
648            )?;
649        }
650        self.touch_job(job_id, "failed")?;
651        Ok(())
652    }
653
654    pub fn mark_segment_failed_if_unfinished(
655        &self,
656        job_id: &str,
657        segment_id: &str,
658        error: &str,
659    ) -> Result<()> {
660        {
661            let conn = self.conn.borrow();
662            conn.execute(
663                "UPDATE segments
664                 SET status = 'failed', attempts = attempts + 1, error = ?1
665                 WHERE job_id = ?2
666                   AND id = ?3
667                   AND status NOT IN ('succeeded', 'skipped_cached', 'needs_review')",
668                params![error, job_id, segment_id],
669            )?;
670        }
671        self.touch_job(job_id, "failed")?;
672        Ok(())
673    }
674
675    pub fn mark_unfinished_segments_failed(
676        &self,
677        job_id: &str,
678        candidate_segment_ids: &[String],
679        error: &str,
680    ) -> Result<usize> {
681        const SQLITE_IN_CHUNK_SIZE: usize = 900;
682        let mut updated = 0;
683
684        for chunk in candidate_segment_ids.chunks(SQLITE_IN_CHUNK_SIZE) {
685            if chunk.is_empty() {
686                continue;
687            }
688
689            let placeholders = std::iter::repeat_n("?", chunk.len())
690                .collect::<Vec<_>>()
691                .join(", ");
692            let sql = format!(
693                "UPDATE segments
694                 SET status = 'failed', attempts = attempts + 1, error = ?
695                 WHERE job_id = ?
696                   AND id IN ({placeholders})
697                   AND status NOT IN ('succeeded', 'skipped_cached', 'needs_review')"
698            );
699
700            let conn = self.conn.borrow();
701            let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 2);
702            params.push(&error);
703            params.push(&job_id);
704            for id in chunk {
705                params.push(id);
706            }
707            updated += conn.execute(&sql, params.as_slice())?;
708        }
709
710        if updated > 0 {
711            self.touch_job(job_id, "failed")?;
712        }
713        Ok(updated)
714    }
715
716    pub fn get_job(&self, job_id: &str) -> Result<Option<JobRecord>> {
717        let conn = self.conn.borrow();
718        conn.query_row(
719            "SELECT id, input_path, input_snapshot_path, input_sha256, output_path, input_hash, source_lang, target_lang, provider, model, base_url, api_key_env, status,
720                    events_path, report_json_path, report_markdown_path
721             FROM jobs WHERE id = ?1",
722            params![job_id],
723            |row| {
724                Ok(JobRecord {
725                    id: row.get(0)?,
726                    input_path: PathBuf::from(row.get::<_, String>(1)?),
727                    input_snapshot_path: row.get::<_, Option<String>>(2)?.map(PathBuf::from),
728                    input_sha256: row.get(3)?,
729                    output_path: PathBuf::from(row.get::<_, String>(4)?),
730                    input_hash: row.get(5)?,
731                    source_lang: row.get(6)?,
732                    target_lang: row.get(7)?,
733                    provider: row.get(8)?,
734                    model: row.get(9)?,
735                    base_url: row.get(10)?,
736                    api_key_env: row.get(11)?,
737                    status: row.get(12)?,
738                    events_path: row.get::<_, Option<String>>(13)?.map(PathBuf::from),
739                    report_json_path: row.get::<_, Option<String>>(14)?.map(PathBuf::from),
740                    report_markdown_path: row.get::<_, Option<String>>(15)?.map(PathBuf::from),
741                })
742            },
743        )
744        .optional()
745        .map_err(StoreError::from)
746    }
747
748    pub fn summary(&self, job_id: &str) -> Result<Option<JobSummary>> {
749        let Some(job) = self.get_job(job_id)? else {
750            return Ok(None);
751        };
752        let conn = self.conn.borrow();
753        let mut summary = JobSummary {
754            id: job.id,
755            status: job.status,
756            ..JobSummary::default()
757        };
758
759        let mut stmt = conn.prepare(
760            "SELECT status,
761                    COUNT(*),
762                    COALESCE(SUM(COALESCE(tokens_input, input_tokens)), 0),
763                    COALESCE(SUM(tokens_input_cached), 0),
764                    COALESCE(SUM(COALESCE(tokens_output, output_tokens)), 0)
765             FROM segments WHERE job_id = ?1 GROUP BY status",
766        )?;
767        let rows = stmt.query_map(params![job_id], |row| {
768            Ok((
769                row.get::<_, String>(0)?,
770                row.get::<_, i64>(1)?,
771                row.get::<_, i64>(2)?,
772                row.get::<_, i64>(3)?,
773                row.get::<_, i64>(4)?,
774            ))
775        })?;
776
777        for row in rows {
778            let (status, count, input_tokens, input_cached_tokens, output_tokens) = row?;
779            let count = count as usize;
780            summary.total_segments += count;
781            summary.input_tokens += input_tokens as u64;
782            summary.input_cached_tokens += input_cached_tokens as u64;
783            summary.output_tokens += output_tokens as u64;
784            match status.as_str() {
785                "succeeded" => summary.succeeded += count,
786                "failed" => summary.failed += count,
787                "needs_review" => summary.needs_review += count,
788                "retry_pending" => summary.retry_pending += count,
789                "skipped_cached" => summary.cached += count,
790                _ => {}
791            }
792        }
793
794        summary.retried = conn.query_row(
795            "SELECT COUNT(*) FROM segments WHERE job_id = ?1 AND attempts > 1",
796            params![job_id],
797            |row| row.get::<_, i64>(0),
798        )? as usize;
799
800        Ok(Some(summary))
801    }
802
803    pub fn retry_segments(&self, job_id: &str, scope: RetryScope) -> Result<usize> {
804        let where_status = match scope {
805            RetryScope::Failed => "status = 'failed'",
806            RetryScope::NeedsReview => "status = 'needs_review'",
807            RetryScope::All => "status IN ('failed', 'needs_review')",
808        };
809        let sql = format!(
810            "UPDATE segments SET status = 'retry_pending', error = NULL WHERE job_id = ?1 AND {where_status}"
811        );
812        let count = {
813            let conn = self.conn.borrow();
814            conn.execute(&sql, params![job_id])?
815        };
816        self.touch_job(job_id, "retry_pending")?;
817        Ok(count)
818    }
819
820    pub fn insert_segment_flags(&self, flags: &[NewSegmentFlag<'_>]) -> Result<usize> {
821        let mut conn = self.conn.borrow_mut();
822        let tx = conn.transaction()?;
823        let ingested_at = timestamp_string();
824        let mut inserted = 0usize;
825        for flag in flags {
826            inserted += tx.execute(
827                "INSERT INTO segment_flags
828                 (job_id, segment_id, kind, note, suggested_source, suggested_target, ingested_at, consumed)
829                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
830                params![
831                    flag.job_id,
832                    flag.segment_id,
833                    flag.kind,
834                    flag.note,
835                    flag.suggested_source,
836                    flag.suggested_target,
837                    ingested_at,
838                    if flag.consumed { 1_i64 } else { 0_i64 },
839                ],
840            )?;
841        }
842        tx.commit()?;
843        Ok(inserted)
844    }
845
846    pub fn mark_segments_needs_review(
847        &self,
848        job_id: &str,
849        segment_ids: &[String],
850        reason: &str,
851    ) -> Result<usize> {
852        const SQLITE_IN_CHUNK_SIZE: usize = 900;
853        let mut updated = 0usize;
854        for chunk in segment_ids.chunks(SQLITE_IN_CHUNK_SIZE) {
855            if chunk.is_empty() {
856                continue;
857            }
858            let placeholders = std::iter::repeat_n("?", chunk.len())
859                .collect::<Vec<_>>()
860                .join(", ");
861            let sql = format!(
862                "UPDATE segments
863                 SET status = 'needs_review',
864                     error = ?
865                 WHERE job_id = ?
866                   AND id IN ({placeholders})"
867            );
868            let conn = self.conn.borrow();
869            let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 2);
870            params.push(&reason);
871            params.push(&job_id);
872            for id in chunk {
873                params.push(id);
874            }
875            updated += conn.execute(&sql, params.as_slice())?;
876        }
877        if updated > 0 {
878            self.touch_job(job_id, "needs_review")?;
879        }
880        Ok(updated)
881    }
882
883    pub fn segment_flag_count(&self, job_id: &str) -> Result<usize> {
884        let conn = self.conn.borrow();
885        let count = conn.query_row(
886            "SELECT COUNT(*) FROM segment_flags WHERE job_id = ?1",
887            params![job_id],
888            |row| row.get::<_, i64>(0),
889        )?;
890        Ok(count as usize)
891    }
892
893    pub fn pending_segment_ids(&self, job_id: &str) -> Result<Vec<String>> {
894        let conn = self.conn.borrow();
895        let mut stmt = conn.prepare(
896            "SELECT id FROM segments
897             WHERE job_id = ?1 AND status IN ('queued', 'retry_pending')
898             ORDER BY ordinal",
899        )?;
900        let rows = stmt.query_map(params![job_id], |row| row.get::<_, String>(0))?;
901        rows.collect::<std::result::Result<Vec<_>, _>>()
902            .map_err(StoreError::from)
903    }
904
905    pub fn segment_records(&self, job_id: &str) -> Result<Vec<SegmentRecord>> {
906        let conn = self.conn.borrow();
907        let mut stmt = conn.prepare(
908            "SELECT id,
909                    status,
910                    attempts,
911                    error,
912                    COALESCE(tokens_input, input_tokens),
913                    tokens_input_cached,
914                    COALESCE(tokens_output, output_tokens),
915                    tokens_estimated
916             FROM segments WHERE job_id = ?1 ORDER BY ordinal",
917        )?;
918        let rows = stmt.query_map(params![job_id], |row| {
919            Ok(SegmentRecord {
920                id: row.get(0)?,
921                status: row.get(1)?,
922                attempts: row.get::<_, i64>(2)? as usize,
923                error: row.get(3)?,
924                input_tokens: row.get::<_, Option<i64>>(4)?.map(|value| value as u64),
925                input_cached_tokens: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
926                output_tokens: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
927                tokens_estimated: row.get::<_, i64>(7)? != 0,
928            })
929        })?;
930        rows.collect::<std::result::Result<Vec<_>, _>>()
931            .map_err(StoreError::from)
932    }
933
934    pub fn load_block_translations(&self, job_id: &str) -> Result<Vec<StoredBlockTranslation>> {
935        let conn = self.conn.borrow();
936        let mut stmt = conn.prepare(
937            "SELECT segment_id, block_id, translated_text
938             FROM translation_blocks WHERE job_id = ?1 ORDER BY segment_id, block_id",
939        )?;
940        let rows = stmt.query_map(params![job_id], |row| {
941            Ok(StoredBlockTranslation {
942                segment_id: row.get(0)?,
943                block_id: row.get(1)?,
944                text: row.get(2)?,
945            })
946        })?;
947        rows.collect::<std::result::Result<Vec<_>, _>>()
948            .map_err(StoreError::from)
949    }
950
951    pub fn load_terminal_segment_translations(
952        &self,
953        job_id: &str,
954    ) -> Result<Vec<StoredSegmentTranslation>> {
955        let conn = self.conn.borrow();
956        let mut stmt = conn.prepare(
957            "SELECT s.id, s.ordinal, s.status, s.error, t.translated_text
958             FROM segments s
959             JOIN translations t ON t.job_id = s.job_id AND t.segment_id = s.id
960             WHERE s.job_id = ?1 AND s.status IN ('succeeded', 'skipped_cached', 'needs_review')
961             ORDER BY s.ordinal",
962        )?;
963        let rows = stmt.query_map(params![job_id], |row| {
964            Ok((
965                row.get::<_, String>(0)?,
966                row.get::<_, i64>(1)?,
967                row.get::<_, String>(2)?,
968                row.get::<_, Option<String>>(3)?,
969                row.get::<_, String>(4)?,
970            ))
971        })?;
972
973        let mut records = Vec::new();
974        for row in rows {
975            let (segment_id, ordinal, status, error, translated_text) = row?;
976            let mut block_stmt = conn.prepare(
977                "SELECT block_id, translated_text
978                 FROM translation_blocks
979                 WHERE job_id = ?1 AND segment_id = ?2
980                 ORDER BY block_id",
981            )?;
982            let blocks = block_stmt
983                .query_map(params![job_id, segment_id.as_str()], |row| {
984                    Ok(BlockTranslation {
985                        block_id: BlockId(row.get::<_, String>(0)?),
986                        text: row.get(1)?,
987                    })
988                })?
989                .collect::<std::result::Result<Vec<_>, _>>()?;
990            records.push(StoredSegmentTranslation {
991                segment_id,
992                ordinal: ordinal as usize,
993                status,
994                error,
995                translated_text,
996                blocks,
997            });
998        }
999
1000        Ok(records)
1001    }
1002
1003    pub fn resumable_segment_ids(&self, job_id: &str) -> Result<Vec<String>> {
1004        let conn = self.conn.borrow();
1005        let mut stmt = conn.prepare(
1006            "SELECT id FROM segments
1007             WHERE job_id = ?1 AND status IN ('queued', 'retry_pending', 'failed')
1008             ORDER BY ordinal",
1009        )?;
1010        let rows = stmt.query_map(params![job_id], |row| row.get::<_, String>(0))?;
1011        rows.collect::<std::result::Result<Vec<_>, _>>()
1012            .map_err(StoreError::from)
1013    }
1014
1015    #[allow(clippy::too_many_arguments)]
1016    pub fn find_cached_translation(
1017        &self,
1018        segment: &Segment,
1019        prompt_version: &str,
1020        provider: &str,
1021        model: &str,
1022        source_lang: Option<&str>,
1023        target_lang: &str,
1024        cache_namespace: &str,
1025    ) -> Result<Option<CachedTranslation>> {
1026        let conn = self.conn.borrow();
1027        let cached = conn
1028            .query_row(
1029                "SELECT t.job_id, t.segment_id, t.translated_text
1030                 FROM translations t
1031                 JOIN segments s ON s.job_id = t.job_id AND s.id = t.segment_id
1032                 JOIN jobs j ON j.id = t.job_id
1033                 WHERE s.source_hash = ?1
1034                   AND s.prompt_version = ?2
1035                   AND s.provider = ?3
1036                   AND s.model = ?4
1037                   AND ((?5 IS NULL AND j.source_lang IS NULL) OR j.source_lang = ?5)
1038                   AND j.target_lang = ?6
1039                   AND s.cache_namespace = ?7
1040                   AND s.status IN ('succeeded', 'skipped_cached')
1041                 ORDER BY t.created_at DESC
1042                 LIMIT 1",
1043                params![
1044                    segment.checksum,
1045                    prompt_version,
1046                    provider,
1047                    model,
1048                    source_lang,
1049                    target_lang,
1050                    cache_namespace,
1051                ],
1052                |row| {
1053                    Ok((
1054                        row.get::<_, String>(0)?,
1055                        row.get::<_, String>(1)?,
1056                        row.get::<_, String>(2)?,
1057                    ))
1058                },
1059            )
1060            .optional()?;
1061
1062        let Some((job_id, segment_id, translated_text)) = cached else {
1063            return Ok(None);
1064        };
1065
1066        let mut stmt = conn.prepare(
1067            "SELECT block_id, translated_text
1068             FROM translation_blocks
1069             WHERE job_id = ?1 AND segment_id = ?2
1070             ORDER BY block_id",
1071        )?;
1072        let rows = stmt.query_map(params![job_id, segment_id], |row| {
1073            Ok(BlockTranslation {
1074                block_id: BlockId(row.get::<_, String>(0)?),
1075                text: row.get(1)?,
1076            })
1077        })?;
1078        let blocks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1079
1080        let mut by_id = blocks
1081            .into_iter()
1082            .map(|block| (block.block_id.0.clone(), block))
1083            .collect::<HashMap<_, _>>();
1084
1085        let mut ordered = Vec::with_capacity(segment.block_ids.len());
1086        for id in &segment.block_ids {
1087            let Some(block) = by_id.remove(&id.0) else {
1088                return Ok(None);
1089            };
1090            ordered.push(block);
1091        }
1092        if !by_id.is_empty() {
1093            return Ok(None);
1094        }
1095
1096        Ok(Some(CachedTranslation {
1097            translated_text,
1098            blocks: ordered,
1099        }))
1100    }
1101
1102    pub fn find_cached_translations_batch(
1103        &self,
1104        segments: &[Segment],
1105        request: CacheLookupRequest<'_>,
1106    ) -> Result<HashMap<String, CachedTranslation>> {
1107        let mut results = HashMap::new();
1108        if segments.is_empty() {
1109            return Ok(results);
1110        }
1111
1112        const SQLITE_IN_CHUNK_SIZE: usize = 900;
1113
1114        for chunk in segments.chunks(SQLITE_IN_CHUNK_SIZE) {
1115            let hashes: Vec<&str> = chunk.iter().map(|s| s.checksum.as_str()).collect();
1116            let placeholders: Vec<String> =
1117                (0..hashes.len()).map(|i| format!("?{}", i + 1)).collect();
1118            let placeholders_sql = placeholders.join(", ");
1119
1120            let sql = format!(
1121                "SELECT t.job_id, t.segment_id, t.translated_text, s.source_hash
1122                 FROM translations t
1123                 JOIN segments s ON s.job_id = t.job_id AND s.id = t.segment_id
1124                 JOIN jobs j ON j.id = t.job_id
1125                 WHERE s.source_hash IN ({placeholders_sql})
1126                   AND s.prompt_version = ?{}
1127                   AND s.provider = ?{}
1128                   AND s.model = ?{}
1129                   AND ((?{} IS NULL AND j.source_lang IS NULL) OR j.source_lang = ?{})
1130                   AND j.target_lang = ?{}
1131                   AND s.cache_namespace = ?{}
1132                   AND s.status IN ('succeeded', 'skipped_cached')
1133                 ORDER BY t.created_at DESC",
1134                hashes.len() + 1,
1135                hashes.len() + 2,
1136                hashes.len() + 3,
1137                hashes.len() + 4,
1138                hashes.len() + 5,
1139                hashes.len() + 6,
1140                hashes.len() + 7,
1141            );
1142
1143            let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1144            for hash in &hashes {
1145                params.push(Box::new(hash.to_string()));
1146            }
1147            params.push(Box::new(request.prompt_version.to_string()));
1148            params.push(Box::new(request.provider.to_string()));
1149            params.push(Box::new(request.model.to_string()));
1150            params.push(Box::new(request.source_lang.map(|s| s.to_string())));
1151            params.push(Box::new(request.source_lang.map(|s| s.to_string())));
1152            params.push(Box::new(request.target_lang.to_string()));
1153            params.push(Box::new(request.cache_namespace.to_string()));
1154
1155            let conn = self.conn.borrow();
1156            let mut stmt = conn.prepare(&sql)?;
1157            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1158                params.iter().map(|p| p.as_ref()).collect();
1159
1160            let rows = stmt.query_map(param_refs.as_slice(), |row| {
1161                Ok((
1162                    row.get::<_, String>(0)?,
1163                    row.get::<_, String>(1)?,
1164                    row.get::<_, String>(2)?,
1165                    row.get::<_, String>(3)?,
1166                ))
1167            })?;
1168
1169            let mut hash_to_hit: HashMap<String, (String, String, String)> = HashMap::new();
1170            for row in rows {
1171                let (job_id, segment_id, translated_text, source_hash) = row?;
1172                hash_to_hit
1173                    .entry(source_hash)
1174                    .or_insert((job_id, segment_id, translated_text));
1175            }
1176
1177            for segment in chunk {
1178                if let Some((job_id, segment_id, translated_text)) =
1179                    hash_to_hit.get(&segment.checksum)
1180                {
1181                    let mut block_stmt = conn.prepare(
1182                        "SELECT block_id, translated_text
1183                         FROM translation_blocks
1184                         WHERE job_id = ?1 AND segment_id = ?2
1185                         ORDER BY block_id",
1186                    )?;
1187                    let block_rows = block_stmt.query_map(params![job_id, segment_id], |row| {
1188                        Ok(BlockTranslation {
1189                            block_id: BlockId(row.get::<_, String>(0)?),
1190                            text: row.get(1)?,
1191                        })
1192                    })?;
1193                    let blocks = block_rows.collect::<std::result::Result<Vec<_>, _>>()?;
1194
1195                    let mut by_id = blocks
1196                        .into_iter()
1197                        .map(|block| (block.block_id.0.clone(), block))
1198                        .collect::<HashMap<_, _>>();
1199
1200                    let mut ordered = Vec::with_capacity(segment.block_ids.len());
1201                    let mut valid = true;
1202                    for id in &segment.block_ids {
1203                        let Some(block) = by_id.remove(&id.0) else {
1204                            valid = false;
1205                            break;
1206                        };
1207                        ordered.push(block);
1208                    }
1209                    if !valid || !by_id.is_empty() {
1210                        continue;
1211                    }
1212
1213                    results.insert(
1214                        segment.id.0.clone(),
1215                        CachedTranslation {
1216                            translated_text: translated_text.clone(),
1217                            blocks: ordered,
1218                        },
1219                    );
1220                }
1221            }
1222        }
1223
1224        Ok(results)
1225    }
1226
1227    fn migrate(&self) -> Result<()> {
1228        let conn = self.conn.borrow();
1229        if table_exists(&conn, "translations")?
1230            && !table_has_column(&conn, "translations", "job_id")?
1231        {
1232            let suffix = unix_timestamp_nanos();
1233            rename_table_if_exists(&conn, "qa_findings", suffix)?;
1234            rename_table_if_exists(&conn, "translation_blocks", suffix)?;
1235            rename_table_if_exists(&conn, "translations", suffix)?;
1236            rename_table_if_exists(&conn, "segments", suffix)?;
1237            rename_table_if_exists(&conn, "jobs", suffix)?;
1238        }
1239        conn.execute_batch(
1240            "
1241            PRAGMA foreign_keys = ON;
1242            CREATE TABLE IF NOT EXISTS _migrations (
1243              version INTEGER PRIMARY KEY,
1244              name TEXT NOT NULL,
1245              applied_at TEXT NOT NULL
1246            );
1247
1248            CREATE TABLE IF NOT EXISTS jobs (
1249              id TEXT PRIMARY KEY,
1250              input_path TEXT NOT NULL DEFAULT '',
1251              input_snapshot_path TEXT,
1252              input_sha256 TEXT,
1253              output_path TEXT NOT NULL DEFAULT '',
1254              input_hash TEXT NOT NULL,
1255              source_lang TEXT,
1256              target_lang TEXT NOT NULL,
1257              provider TEXT NOT NULL,
1258              model TEXT NOT NULL,
1259              base_url TEXT,
1260              api_key_env TEXT,
1261              status TEXT NOT NULL,
1262              config_json TEXT,
1263              events_path TEXT,
1264              report_json_path TEXT,
1265              report_markdown_path TEXT,
1266              created_at TEXT NOT NULL,
1267              updated_at TEXT NOT NULL
1268            );
1269
1270            CREATE TABLE IF NOT EXISTS segments (
1271              id TEXT NOT NULL,
1272              job_id TEXT NOT NULL,
1273              section_id TEXT NOT NULL,
1274              ordinal INTEGER NOT NULL,
1275              source_hash TEXT NOT NULL,
1276              prompt_version TEXT NOT NULL,
1277              provider TEXT NOT NULL,
1278              model TEXT NOT NULL,
1279              status TEXT NOT NULL,
1280              attempts INTEGER NOT NULL DEFAULT 0,
1281              input_tokens INTEGER,
1282              output_tokens INTEGER,
1283              tokens_input INTEGER,
1284              tokens_input_cached INTEGER,
1285              tokens_output INTEGER,
1286              tokens_estimated INTEGER NOT NULL DEFAULT 0,
1287              cost_estimate REAL,
1288              error TEXT,
1289              translated_hash TEXT,
1290              PRIMARY KEY (job_id, id),
1291              FOREIGN KEY(job_id) REFERENCES jobs(id)
1292            );
1293
1294            CREATE TABLE IF NOT EXISTS translations (
1295              segment_id TEXT NOT NULL,
1296              job_id TEXT NOT NULL,
1297              translated_text TEXT NOT NULL,
1298              provider TEXT NOT NULL,
1299              model TEXT NOT NULL,
1300              prompt_version TEXT NOT NULL,
1301              created_at TEXT NOT NULL,
1302              PRIMARY KEY (job_id, segment_id),
1303              FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
1304            );
1305
1306            CREATE TABLE IF NOT EXISTS translation_blocks (
1307              segment_id TEXT NOT NULL,
1308              job_id TEXT NOT NULL,
1309              block_id TEXT NOT NULL,
1310              translated_text TEXT NOT NULL,
1311              PRIMARY KEY (job_id, segment_id, block_id),
1312              FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
1313            );
1314
1315            CREATE TABLE IF NOT EXISTS qa_findings (
1316              id TEXT PRIMARY KEY,
1317              segment_id TEXT NOT NULL,
1318              job_id TEXT NOT NULL,
1319              severity TEXT NOT NULL,
1320              kind TEXT NOT NULL,
1321              message TEXT NOT NULL,
1322              FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
1323            );
1324
1325            CREATE TABLE IF NOT EXISTS segment_flags (
1326              id INTEGER PRIMARY KEY,
1327              job_id TEXT NOT NULL,
1328              segment_id TEXT NOT NULL,
1329              kind TEXT NOT NULL,
1330              note TEXT,
1331              suggested_source TEXT,
1332              suggested_target TEXT,
1333              ingested_at TEXT NOT NULL,
1334              consumed INTEGER NOT NULL DEFAULT 0,
1335              FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
1336            );
1337            ",
1338        )?;
1339        ensure_column(&conn, "jobs", "input_path", "TEXT NOT NULL DEFAULT ''")?;
1340        ensure_column(&conn, "jobs", "input_snapshot_path", "TEXT")?;
1341        ensure_column(&conn, "jobs", "input_sha256", "TEXT")?;
1342        ensure_column(&conn, "jobs", "output_path", "TEXT NOT NULL DEFAULT ''")?;
1343        ensure_column(&conn, "jobs", "base_url", "TEXT")?;
1344        ensure_column(&conn, "jobs", "api_key_env", "TEXT")?;
1345        ensure_column(&conn, "jobs", "config_json", "TEXT")?;
1346        ensure_column(&conn, "jobs", "events_path", "TEXT")?;
1347        ensure_column(&conn, "jobs", "report_json_path", "TEXT")?;
1348        ensure_column(&conn, "jobs", "report_markdown_path", "TEXT")?;
1349        ensure_column(
1350            &conn,
1351            "segments",
1352            "cache_namespace",
1353            "TEXT NOT NULL DEFAULT ''",
1354        )?;
1355        ensure_column(&conn, "segments", "tokens_input", "INTEGER")?;
1356        ensure_column(&conn, "segments", "tokens_input_cached", "INTEGER")?;
1357        ensure_column(&conn, "segments", "tokens_output", "INTEGER")?;
1358        ensure_column(
1359            &conn,
1360            "segments",
1361            "tokens_estimated",
1362            "INTEGER NOT NULL DEFAULT 0",
1363        )?;
1364        conn.execute_batch(
1365            "CREATE INDEX IF NOT EXISTS idx_segments_cache_lookup
1366             ON segments(source_hash, cache_namespace, prompt_version, provider, model, status);
1367             CREATE INDEX IF NOT EXISTS idx_segment_flags_job
1368             ON segment_flags(job_id, consumed);",
1369        )?;
1370        record_migration(&conn, 1, "initial")?;
1371        record_migration(&conn, 2, "v1_0_1_input_snapshot")?;
1372        record_migration(&conn, 3, "v1_1_segment_flags")?;
1373        Ok(())
1374    }
1375
1376    fn touch_job(&self, job_id: &str, status: &str) -> Result<()> {
1377        let conn = self.conn.borrow();
1378        conn.execute(
1379            "UPDATE jobs SET status = ?1, updated_at = ?2 WHERE id = ?3",
1380            params![status, timestamp_string(), job_id],
1381        )?;
1382        Ok(())
1383    }
1384}
1385
1386fn table_exists(conn: &Connection, table: &str) -> rusqlite::Result<bool> {
1387    conn.query_row(
1388        "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
1389        params![table],
1390        |row| row.get::<_, bool>(0),
1391    )
1392}
1393
1394fn table_has_column(conn: &Connection, table: &str, column: &str) -> rusqlite::Result<bool> {
1395    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1396    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1397    for row in rows {
1398        if row? == column {
1399            return Ok(true);
1400        }
1401    }
1402    Ok(false)
1403}
1404
1405fn rename_table_if_exists(conn: &Connection, table: &str, suffix: u128) -> rusqlite::Result<()> {
1406    if table_exists(conn, table)? {
1407        conn.execute(
1408            &format!("ALTER TABLE {table} RENAME TO {table}_legacy_{suffix}"),
1409            [],
1410        )?;
1411    }
1412    Ok(())
1413}
1414
1415fn ensure_column(
1416    conn: &Connection,
1417    table: &str,
1418    column: &str,
1419    definition: &str,
1420) -> rusqlite::Result<()> {
1421    if !table_has_column(conn, table, column)? {
1422        conn.execute(
1423            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
1424            [],
1425        )?;
1426    }
1427    Ok(())
1428}
1429
1430fn record_migration(conn: &Connection, version: i64, name: &str) -> rusqlite::Result<()> {
1431    conn.execute(
1432        "INSERT OR IGNORE INTO _migrations (version, name, applied_at)
1433         VALUES (?1, ?2, ?3)",
1434        params![version, name, timestamp_string()],
1435    )?;
1436    Ok(())
1437}
1438
1439fn replace_block_translations(
1440    conn: &Connection,
1441    job_id: &str,
1442    segment_id: &str,
1443    blocks: &[BlockTranslation],
1444) -> rusqlite::Result<()> {
1445    conn.execute(
1446        "DELETE FROM translation_blocks WHERE job_id = ?1 AND segment_id = ?2",
1447        params![job_id, segment_id],
1448    )?;
1449    for block in blocks {
1450        conn.execute(
1451            "INSERT INTO translation_blocks (segment_id, job_id, block_id, translated_text)
1452             VALUES (?1, ?2, ?3, ?4)",
1453            params![
1454                segment_id,
1455                job_id,
1456                block.block_id.0.as_str(),
1457                block.text.as_str()
1458            ],
1459        )?;
1460    }
1461    Ok(())
1462}
1463
1464#[derive(Debug, Clone, Copy)]
1465pub enum RetryScope {
1466    Failed,
1467    NeedsReview,
1468    All,
1469}
1470
1471fn file_hash(path: &Path) -> CoreResult<String> {
1472    let bytes = fs::read(path)?;
1473    Ok(stable_hash_bytes(&bytes))
1474}
1475
1476fn stable_hash(text: &str) -> String {
1477    stable_hash_bytes(text.as_bytes())
1478}
1479
1480fn stable_hash_bytes(bytes: &[u8]) -> String {
1481    let digest = Sha256::digest(bytes);
1482    let mut output = String::with_capacity(digest.len() * 2);
1483    for byte in digest {
1484        use std::fmt::Write as _;
1485        write!(&mut output, "{byte:02x}").expect("writing to string should not fail");
1486    }
1487    output
1488}
1489
1490fn unix_timestamp() -> u64 {
1491    SystemTime::now()
1492        .duration_since(UNIX_EPOCH)
1493        .unwrap_or_default()
1494        .as_secs()
1495}
1496
1497fn unix_timestamp_nanos() -> u128 {
1498    SystemTime::now()
1499        .duration_since(UNIX_EPOCH)
1500        .unwrap_or_default()
1501        .as_nanos()
1502}
1503
1504fn timestamp_string() -> String {
1505    unix_timestamp().to_string()
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510    use super::*;
1511    use bookforge_core::{
1512        ir::{BlockId, SectionId},
1513        segment::{
1514            Segment, SegmentBlock, SegmentConstraints, SegmentContext, SegmentId, SegmentMetadata,
1515            SegmentSource, SegmentTextRun,
1516        },
1517    };
1518
1519    #[test]
1520    fn store_reuses_connection_across_job_operations() {
1521        let db_path = temp_path("jobs.sqlite");
1522        let input_path = temp_path("input.epub");
1523        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1524
1525        let store = JobStore::open(&db_path).expect("store should open");
1526        let job = store
1527            .create_job(CreateJob {
1528                input: &input_path,
1529                output: &temp_path("output.epub"),
1530                source_lang: Some("English"),
1531                target_lang: "Italian",
1532                provider: "mock",
1533                model: "mock-prefix",
1534                base_url: None,
1535                api_key_env: None,
1536            })
1537            .expect("job should be created");
1538        let segments = vec![segment("seg_a", 0), segment("seg_b", 1)];
1539        store
1540            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
1541            .expect("segments should insert");
1542
1543        store
1544            .save_translation(SaveTranslation {
1545                job_id: &job.id,
1546                segment_id: "seg_a",
1547                translated_text: "Tradotto",
1548                blocks: &[BlockTranslation {
1549                    block_id: BlockId("b_000000".to_string()),
1550                    text: "Tradotto".to_string(),
1551                }],
1552                provider: "mock",
1553                model: "mock-prefix",
1554                prompt_version: "v1",
1555                input_tokens: Some(11),
1556                input_cached_tokens: Some(0),
1557                output_tokens: Some(7),
1558                tokens_estimated: false,
1559            })
1560            .expect("translation should save");
1561        store
1562            .mark_segment_failed(&job.id, "seg_b", "provider unavailable")
1563            .expect("segment should be marked failed");
1564
1565        let summary = store
1566            .summary(&job.id)
1567            .expect("summary should load")
1568            .expect("job should exist");
1569        assert_eq!(summary.total_segments, 2);
1570        assert_eq!(summary.succeeded, 1);
1571        assert_eq!(summary.failed, 1);
1572        assert_eq!(summary.input_tokens, 11);
1573        assert_eq!(summary.output_tokens, 7);
1574        let blocks = store
1575            .load_block_translations(&job.id)
1576            .expect("block translations should load");
1577        assert_eq!(blocks.len(), 1);
1578        assert_eq!(blocks[0].text, "Tradotto");
1579
1580        let _ = fs::remove_file(db_path);
1581        let _ = fs::remove_file(input_path);
1582    }
1583
1584    fn segment(id: &str, ordinal: usize) -> Segment {
1585        let block_id = BlockId(format!("b_{ordinal:06}"));
1586        Segment {
1587            id: SegmentId(id.to_string()),
1588            section_id: SectionId("sec_000000".to_string()),
1589            ordinal,
1590            block_ids: vec![block_id.clone()],
1591            source: SegmentSource {
1592                text: format!("Source {ordinal}"),
1593                blocks: vec![SegmentBlock {
1594                    block_id,
1595                    kind: "paragraph".to_string(),
1596                    text: format!("Source {ordinal}"),
1597                    text_runs: vec![SegmentTextRun {
1598                        id: format!("r{ordinal}"),
1599                        text: format!("Source {ordinal}"),
1600                    }],
1601                    protected_spans: Vec::new(),
1602                }],
1603                token_estimate: 2,
1604            },
1605            context: SegmentContext::default(),
1606            metadata: SegmentMetadata::default(),
1607            constraints: SegmentConstraints::default(),
1608            checksum: format!("checksum_{ordinal}"),
1609        }
1610    }
1611
1612    fn temp_path(name: &str) -> PathBuf {
1613        std::env::temp_dir().join(format!(
1614            "bookforge-store-test-{}-{}-{name}",
1615            std::process::id(),
1616            unix_timestamp_nanos()
1617        ))
1618    }
1619
1620    fn build_seeded_store_with_translation(
1621        db_path: &PathBuf,
1622        cache_namespace: &str,
1623        block_ids: &[&str],
1624    ) -> (JobStore, JobRecord, Segment) {
1625        let input_path = temp_path("input.epub");
1626        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1627
1628        let store = JobStore::open(db_path).expect("store should open");
1629        let job = store
1630            .create_job(CreateJob {
1631                input: &input_path,
1632                output: &temp_path("output.epub"),
1633                source_lang: Some("English"),
1634                target_lang: "Italian",
1635                provider: "mock",
1636                model: "mock-prefix",
1637                base_url: None,
1638                api_key_env: None,
1639            })
1640            .expect("job should be created");
1641
1642        let mut seg = segment("seg_a", 0);
1643        let blocks: Vec<BlockTranslation> = block_ids
1644            .iter()
1645            .map(|id| BlockTranslation {
1646                block_id: BlockId(id.to_string()),
1647                text: format!("Tradotto {id}"),
1648            })
1649            .collect();
1650        seg.block_ids = block_ids.iter().map(|id| BlockId(id.to_string())).collect();
1651
1652        store
1653            .insert_segments(
1654                &job.id,
1655                &[seg.clone()],
1656                "v1",
1657                "mock",
1658                "mock-prefix",
1659                cache_namespace,
1660            )
1661            .expect("segments should insert");
1662        store
1663            .save_translation(SaveTranslation {
1664                job_id: &job.id,
1665                segment_id: "seg_a",
1666                translated_text: "Tradotto",
1667                blocks: &blocks,
1668                provider: "mock",
1669                model: "mock-prefix",
1670                prompt_version: "v1",
1671                input_tokens: Some(11),
1672                input_cached_tokens: Some(0),
1673                output_tokens: Some(7),
1674                tokens_estimated: false,
1675            })
1676            .expect("translation should save");
1677
1678        let _ = fs::remove_file(input_path);
1679        (store, job, seg)
1680    }
1681
1682    #[test]
1683    fn job_config_snapshot_round_trips_through_store() {
1684        let db_path = temp_path("snapshot.sqlite");
1685        let input_path = temp_path("input.epub");
1686        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1687        let store = JobStore::open(&db_path).expect("store should open");
1688        let job = store
1689            .create_job(CreateJob {
1690                input: &input_path,
1691                output: &temp_path("output.epub"),
1692                source_lang: Some("English"),
1693                target_lang: "Italian",
1694                provider: "openrouter",
1695                model: "model",
1696                base_url: Some("https://example.test/v1"),
1697                api_key_env: Some("OPENROUTER_API_KEY"),
1698            })
1699            .expect("job should be created");
1700        let settings = bookforge_core::TranslationProfile::Balanced.resolve();
1701        let snapshot = RunConfigSnapshot {
1702            input_path: input_path.clone(),
1703            input_snapshot_path: Some(temp_path("input-snapshot.epub")),
1704            input_sha256: Some("abc123".to_string()),
1705            output_path: temp_path("translated.epub"),
1706            events_path: Some(temp_path("events.jsonl")),
1707            report_json_path: Some(temp_path("report.json")),
1708            report_markdown_path: Some(temp_path("report.md")),
1709            source_language: Some("English".to_string()),
1710            target_language: "Italian".to_string(),
1711            provider: "openrouter".to_string(),
1712            model: "model".to_string(),
1713            base_url: Some("https://example.test/v1".to_string()),
1714            api_key_env: Some("OPENROUTER_API_KEY".to_string()),
1715            profile: settings.profile,
1716            provider_preset: None,
1717            prompt_version: "batch_v1".to_string(),
1718            cache_namespace: "cache_ns".to_string(),
1719            settings: bookforge_core::ResolvedRunSettingsSnapshot::from_settings(&settings),
1720        };
1721
1722        store
1723            .update_job_config_snapshot(&job.id, &snapshot)
1724            .expect("snapshot should persist");
1725        let loaded = store
1726            .load_job_config_snapshot(&job.id)
1727            .expect("snapshot should load")
1728            .expect("snapshot should exist");
1729        assert_eq!(loaded, snapshot);
1730
1731        let reloaded_job = store
1732            .get_job(&job.id)
1733            .expect("job should load")
1734            .expect("job should exist");
1735        assert_eq!(reloaded_job.events_path, snapshot.events_path);
1736        assert_eq!(reloaded_job.report_json_path, snapshot.report_json_path);
1737        assert_eq!(
1738            reloaded_job.report_markdown_path,
1739            snapshot.report_markdown_path
1740        );
1741
1742        let _ = fs::remove_file(db_path);
1743        let _ = fs::remove_file(input_path);
1744    }
1745
1746    #[test]
1747    fn job_config_snapshot_does_not_store_api_key_value() {
1748        let db_path = temp_path("snapshot_secret.sqlite");
1749        let input_path = temp_path("input.epub");
1750        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1751        let api_key_env = "BOOKFORGE_TEST_API_KEY_VALUE_NOT_STORED";
1752        let api_key_value = "sk-live-secret-that-must-not-be-persisted";
1753        // This test uses a unique process-local env var and verifies snapshot
1754        // serialization never reads the value.
1755        unsafe {
1756            std::env::set_var(api_key_env, api_key_value);
1757        }
1758
1759        let store = JobStore::open(&db_path).expect("store should open");
1760        let job = store
1761            .create_job(CreateJob {
1762                input: &input_path,
1763                output: &temp_path("output.epub"),
1764                source_lang: Some("English"),
1765                target_lang: "Italian",
1766                provider: "openrouter",
1767                model: "model",
1768                base_url: Some("https://example.test/v1"),
1769                api_key_env: Some(api_key_env),
1770            })
1771            .expect("job should be created");
1772        let settings = bookforge_core::TranslationProfile::Balanced.resolve();
1773        let snapshot = RunConfigSnapshot {
1774            input_path: input_path.clone(),
1775            input_snapshot_path: None,
1776            input_sha256: None,
1777            output_path: temp_path("translated.epub"),
1778            events_path: Some(temp_path("events.jsonl")),
1779            report_json_path: Some(temp_path("report.json")),
1780            report_markdown_path: Some(temp_path("report.md")),
1781            source_language: Some("English".to_string()),
1782            target_language: "Italian".to_string(),
1783            provider: "openrouter".to_string(),
1784            model: "model".to_string(),
1785            base_url: Some("https://example.test/v1".to_string()),
1786            api_key_env: Some(api_key_env.to_string()),
1787            profile: settings.profile,
1788            provider_preset: None,
1789            prompt_version: "batch_v1".to_string(),
1790            cache_namespace: "cache_ns".to_string(),
1791            settings: bookforge_core::ResolvedRunSettingsSnapshot::from_settings(&settings),
1792        };
1793
1794        store
1795            .update_job_config_snapshot(&job.id, &snapshot)
1796            .expect("snapshot should persist");
1797        let raw_json = {
1798            let conn = store.conn.borrow();
1799            conn.query_row(
1800                "SELECT config_json FROM jobs WHERE id = ?1",
1801                params![job.id],
1802                |row| row.get::<_, String>(0),
1803            )
1804            .expect("raw snapshot JSON should load")
1805        };
1806
1807        assert!(raw_json.contains(api_key_env));
1808        assert!(!raw_json.contains(api_key_value));
1809
1810        unsafe {
1811            std::env::remove_var(api_key_env);
1812        }
1813        let _ = fs::remove_file(db_path);
1814        let _ = fs::remove_file(input_path);
1815    }
1816
1817    #[test]
1818    fn terminal_loading_and_resumable_ids_preserve_lifecycle_boundaries() {
1819        let db_path = temp_path("terminal_resume.sqlite");
1820        let input_path = temp_path("input.epub");
1821        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1822        let store = JobStore::open(&db_path).expect("store should open");
1823        let job = store
1824            .create_job(CreateJob {
1825                input: &input_path,
1826                output: &temp_path("output.epub"),
1827                source_lang: Some("English"),
1828                target_lang: "Italian",
1829                provider: "mock",
1830                model: "mock-prefix",
1831                base_url: None,
1832                api_key_env: None,
1833            })
1834            .expect("job should be created");
1835        let segments = vec![
1836            segment("seg_done", 0),
1837            segment("seg_cached", 1),
1838            segment("seg_review", 2),
1839            segment("seg_failed", 3),
1840            segment("seg_queued", 4),
1841        ];
1842        store
1843            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "ns")
1844            .expect("segments should insert");
1845        store
1846            .save_translation(SaveTranslation {
1847                job_id: &job.id,
1848                segment_id: "seg_done",
1849                translated_text: "Done",
1850                blocks: &[BlockTranslation {
1851                    block_id: BlockId("b_000000".to_string()),
1852                    text: "Done".to_string(),
1853                }],
1854                provider: "mock",
1855                model: "mock-prefix",
1856                prompt_version: "v1",
1857                input_tokens: None,
1858                input_cached_tokens: None,
1859                output_tokens: None,
1860                tokens_estimated: false,
1861            })
1862            .expect("done should save");
1863        store
1864            .save_cached_translation(SaveCachedTranslation {
1865                job_id: &job.id,
1866                segment_id: "seg_cached",
1867                translated_text: "Cached",
1868                blocks: &[BlockTranslation {
1869                    block_id: BlockId("b_000001".to_string()),
1870                    text: "Cached".to_string(),
1871                }],
1872                provider: "mock",
1873                model: "mock-prefix",
1874                prompt_version: "v1",
1875            })
1876            .expect("cached should save");
1877        store
1878            .save_needs_review(SaveNeedsReview {
1879                job_id: &job.id,
1880                segment_id: "seg_review",
1881                preserved_text: "Review",
1882                blocks: &[BlockTranslation {
1883                    block_id: BlockId("b_000002".to_string()),
1884                    text: "Review".to_string(),
1885                }],
1886                provider: "mock",
1887                model: "mock-prefix",
1888                prompt_version: "v1",
1889                error: "needs eyes",
1890                input_tokens: None,
1891                input_cached_tokens: None,
1892                output_tokens: None,
1893                tokens_estimated: false,
1894            })
1895            .expect("review should save");
1896        store
1897            .mark_segment_failed(&job.id, "seg_failed", "failed")
1898            .expect("failed should mark");
1899
1900        let terminal = store
1901            .load_terminal_segment_translations(&job.id)
1902            .expect("terminal records should load");
1903        let ids = terminal
1904            .iter()
1905            .map(|record| record.segment_id.as_str())
1906            .collect::<Vec<_>>();
1907        assert_eq!(ids, vec!["seg_done", "seg_cached", "seg_review"]);
1908        assert_eq!(terminal[0].blocks[0].block_id.0, "b_000000");
1909        assert_eq!(terminal[2].status, "needs_review");
1910
1911        let resumable = store
1912            .resumable_segment_ids(&job.id)
1913            .expect("resumable ids should load");
1914        assert_eq!(resumable, vec!["seg_failed", "seg_queued"]);
1915
1916        let _ = fs::remove_file(db_path);
1917        let _ = fs::remove_file(input_path);
1918    }
1919
1920    #[test]
1921    fn mark_unfinished_segments_failed_preserves_terminal_segments() {
1922        let db_path = temp_path("unfinished_preserve.sqlite");
1923        let input_path = temp_path("input.epub");
1924        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1925        let store = JobStore::open(&db_path).expect("store should open");
1926        let job = store
1927            .create_job(CreateJob {
1928                input: &input_path,
1929                output: &temp_path("output.epub"),
1930                source_lang: Some("English"),
1931                target_lang: "Italian",
1932                provider: "mock",
1933                model: "mock-prefix",
1934                base_url: None,
1935                api_key_env: None,
1936            })
1937            .expect("job should be created");
1938        let segments = vec![
1939            segment("seg_succeeded", 0),
1940            segment("seg_cached", 1),
1941            segment("seg_review", 2),
1942            segment("seg_queued", 3),
1943            segment("seg_retry", 4),
1944        ];
1945        store
1946            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
1947            .expect("segments should insert");
1948        store
1949            .save_translation(SaveTranslation {
1950                job_id: &job.id,
1951                segment_id: "seg_succeeded",
1952                translated_text: "Done",
1953                blocks: &[BlockTranslation {
1954                    block_id: BlockId("b_000000".to_string()),
1955                    text: "Done".to_string(),
1956                }],
1957                provider: "mock",
1958                model: "mock-prefix",
1959                prompt_version: "v1",
1960                input_tokens: None,
1961                input_cached_tokens: None,
1962                output_tokens: None,
1963                tokens_estimated: false,
1964            })
1965            .expect("succeeded segment should save");
1966        store
1967            .save_cached_translation(SaveCachedTranslation {
1968                job_id: &job.id,
1969                segment_id: "seg_cached",
1970                translated_text: "Cached",
1971                blocks: &[BlockTranslation {
1972                    block_id: BlockId("b_000001".to_string()),
1973                    text: "Cached".to_string(),
1974                }],
1975                provider: "mock",
1976                model: "mock-prefix",
1977                prompt_version: "v1",
1978            })
1979            .expect("cached segment should save");
1980        store
1981            .save_needs_review(SaveNeedsReview {
1982                job_id: &job.id,
1983                segment_id: "seg_review",
1984                preserved_text: "Needs review",
1985                blocks: &[BlockTranslation {
1986                    block_id: BlockId("b_000002".to_string()),
1987                    text: "Needs review".to_string(),
1988                }],
1989                provider: "mock",
1990                model: "mock-prefix",
1991                prompt_version: "v1",
1992                error: "qa issue",
1993                input_tokens: None,
1994                input_cached_tokens: None,
1995                output_tokens: None,
1996                tokens_estimated: false,
1997            })
1998            .expect("needs-review segment should save");
1999        store
2000            .retry_segments(&job.id, RetryScope::Failed)
2001            .expect("retry with no failed segments should be harmless");
2002        {
2003            let conn = store.conn.borrow();
2004            conn.execute(
2005                "UPDATE segments SET status = 'retry_pending' WHERE job_id = ?1 AND id = 'seg_retry'",
2006                params![job.id],
2007            )
2008            .expect("test status update should work");
2009        }
2010
2011        let candidate_ids = segments
2012            .iter()
2013            .map(|segment| segment.id.0.clone())
2014            .collect::<Vec<_>>();
2015        let changed = store
2016            .mark_unfinished_segments_failed(&job.id, &candidate_ids, "run failed")
2017            .expect("unfinished segments should be marked failed");
2018        assert_eq!(changed, 2);
2019
2020        let records = store.segment_records(&job.id).expect("records should load");
2021        let statuses = records
2022            .into_iter()
2023            .map(|record| (record.id, record.status))
2024            .collect::<HashMap<_, _>>();
2025        assert_eq!(statuses["seg_succeeded"], "succeeded");
2026        assert_eq!(statuses["seg_cached"], "skipped_cached");
2027        assert_eq!(statuses["seg_review"], "needs_review");
2028        assert_eq!(statuses["seg_queued"], "failed");
2029        assert_eq!(statuses["seg_retry"], "failed");
2030
2031        let _ = fs::remove_file(db_path);
2032        let _ = fs::remove_file(input_path);
2033    }
2034
2035    #[test]
2036    fn mark_unfinished_segments_failed_marks_only_resumable_segments() {
2037        let db_path = temp_path("unfinished_resumable_only.sqlite");
2038        let input_path = temp_path("input.epub");
2039        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2040        let store = JobStore::open(&db_path).expect("store should open");
2041        let job = store
2042            .create_job(CreateJob {
2043                input: &input_path,
2044                output: &temp_path("output.epub"),
2045                source_lang: Some("English"),
2046                target_lang: "Italian",
2047                provider: "mock",
2048                model: "mock-prefix",
2049                base_url: None,
2050                api_key_env: None,
2051            })
2052            .expect("job should be created");
2053        let segments = vec![
2054            segment("seg_succeeded", 0),
2055            segment("seg_cached", 1),
2056            segment("seg_review", 2),
2057            segment("seg_failed", 3),
2058            segment("seg_retry", 4),
2059            segment("seg_queued", 5),
2060        ];
2061        store
2062            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2063            .expect("segments should insert");
2064        {
2065            let conn = store.conn.borrow();
2066            for (id, status) in [
2067                ("seg_succeeded", "succeeded"),
2068                ("seg_cached", "skipped_cached"),
2069                ("seg_review", "needs_review"),
2070                ("seg_failed", "failed"),
2071                ("seg_retry", "retry_pending"),
2072                ("seg_queued", "queued"),
2073            ] {
2074                conn.execute(
2075                    "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
2076                    params![status, job.id, id],
2077                )
2078                .expect("status should update");
2079            }
2080        }
2081
2082        let candidate_ids = segments
2083            .iter()
2084            .map(|segment| segment.id.0.clone())
2085            .collect::<Vec<_>>();
2086        let changed = store
2087            .mark_unfinished_segments_failed(&job.id, &candidate_ids, "run failed")
2088            .expect("unfinished segments should be marked failed");
2089
2090        assert_eq!(changed, 3);
2091        let records = store.segment_records(&job.id).expect("records should load");
2092        let statuses = records
2093            .into_iter()
2094            .map(|record| (record.id, record.status))
2095            .collect::<HashMap<_, _>>();
2096        assert_eq!(statuses["seg_succeeded"], "succeeded");
2097        assert_eq!(statuses["seg_cached"], "skipped_cached");
2098        assert_eq!(statuses["seg_review"], "needs_review");
2099        assert_eq!(statuses["seg_failed"], "failed");
2100        assert_eq!(statuses["seg_retry"], "failed");
2101        assert_eq!(statuses["seg_queued"], "failed");
2102
2103        let _ = fs::remove_file(db_path);
2104        let _ = fs::remove_file(input_path);
2105    }
2106
2107    #[test]
2108    fn resumable_segment_ids_excludes_succeeded_cached_and_needs_review() {
2109        let db_path = temp_path("resumable_excludes.sqlite");
2110        let input_path = temp_path("input.epub");
2111        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2112        let store = JobStore::open(&db_path).expect("store should open");
2113        let job = store
2114            .create_job(CreateJob {
2115                input: &input_path,
2116                output: &temp_path("output.epub"),
2117                source_lang: Some("English"),
2118                target_lang: "Italian",
2119                provider: "mock",
2120                model: "mock-prefix",
2121                base_url: None,
2122                api_key_env: None,
2123            })
2124            .expect("job should be created");
2125        let segments = vec![
2126            segment("seg_succeeded", 0),
2127            segment("seg_cached", 1),
2128            segment("seg_review", 2),
2129        ];
2130        store
2131            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2132            .expect("segments should insert");
2133        {
2134            let conn = store.conn.borrow();
2135            for (id, status) in [
2136                ("seg_succeeded", "succeeded"),
2137                ("seg_cached", "skipped_cached"),
2138                ("seg_review", "needs_review"),
2139            ] {
2140                conn.execute(
2141                    "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
2142                    params![status, job.id, id],
2143                )
2144                .expect("status should update");
2145            }
2146        }
2147
2148        let ids = store
2149            .resumable_segment_ids(&job.id)
2150            .expect("resumable ids should load");
2151
2152        assert!(ids.is_empty());
2153        let _ = fs::remove_file(db_path);
2154        let _ = fs::remove_file(input_path);
2155    }
2156
2157    #[test]
2158    fn resumable_segment_ids_includes_failed_retry_pending_and_pending() {
2159        let db_path = temp_path("resumable_includes.sqlite");
2160        let input_path = temp_path("input.epub");
2161        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2162        let store = JobStore::open(&db_path).expect("store should open");
2163        let job = store
2164            .create_job(CreateJob {
2165                input: &input_path,
2166                output: &temp_path("output.epub"),
2167                source_lang: Some("English"),
2168                target_lang: "Italian",
2169                provider: "mock",
2170                model: "mock-prefix",
2171                base_url: None,
2172                api_key_env: None,
2173            })
2174            .expect("job should be created");
2175        let segments = vec![
2176            segment("seg_failed", 0),
2177            segment("seg_retry", 1),
2178            segment("seg_queued", 2),
2179        ];
2180        store
2181            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2182            .expect("segments should insert");
2183        {
2184            let conn = store.conn.borrow();
2185            for (id, status) in [
2186                ("seg_failed", "failed"),
2187                ("seg_retry", "retry_pending"),
2188                ("seg_queued", "queued"),
2189            ] {
2190                conn.execute(
2191                    "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
2192                    params![status, job.id, id],
2193                )
2194                .expect("status should update");
2195            }
2196        }
2197
2198        let ids = store
2199            .resumable_segment_ids(&job.id)
2200            .expect("resumable ids should load");
2201
2202        assert_eq!(ids, vec!["seg_failed", "seg_retry", "seg_queued"]);
2203        let _ = fs::remove_file(db_path);
2204        let _ = fs::remove_file(input_path);
2205    }
2206
2207    #[test]
2208    fn cached_translation_requires_matching_cache_namespace() {
2209        let db_path = temp_path("ns_match.sqlite");
2210        let (store, _job, seg) =
2211            build_seeded_store_with_translation(&db_path, "ns_one", &["b_000000"]);
2212
2213        let hit = store
2214            .find_cached_translation(
2215                &seg,
2216                "v1",
2217                "mock",
2218                "mock-prefix",
2219                Some("English"),
2220                "Italian",
2221                "ns_one",
2222            )
2223            .expect("query ok");
2224        assert!(hit.is_some(), "matching namespace should hit");
2225
2226        let miss = store
2227            .find_cached_translation(
2228                &seg,
2229                "v1",
2230                "mock",
2231                "mock-prefix",
2232                Some("English"),
2233                "Italian",
2234                "ns_two",
2235            )
2236            .expect("query ok");
2237        assert!(miss.is_none(), "different namespace must not hit");
2238
2239        let _ = fs::remove_file(db_path);
2240    }
2241
2242    #[test]
2243    fn cached_translation_rejects_mismatched_block_ids() {
2244        let db_path = temp_path("blockid_match.sqlite");
2245        let (store, _job, mut seg) =
2246            build_seeded_store_with_translation(&db_path, "ns_x", &["b_000000"]);
2247
2248        // Caller's segment now expects different block IDs than what was stored.
2249        seg.block_ids = vec![BlockId("b_999999".to_string())];
2250
2251        let miss = store
2252            .find_cached_translation(
2253                &seg,
2254                "v1",
2255                "mock",
2256                "mock-prefix",
2257                Some("English"),
2258                "Italian",
2259                "ns_x",
2260            )
2261            .expect("query ok");
2262        assert!(
2263            miss.is_none(),
2264            "mismatched block_ids must reject the cached row"
2265        );
2266
2267        let _ = fs::remove_file(db_path);
2268    }
2269
2270    #[test]
2271    fn old_empty_cache_namespace_rows_do_not_match_new_runs() {
2272        let db_path = temp_path("legacy_ns.sqlite");
2273        // Simulate a row migrated from an older schema with the default empty namespace.
2274        let (store, _job, seg) = build_seeded_store_with_translation(&db_path, "", &["b_000000"]);
2275
2276        let miss = store
2277            .find_cached_translation(
2278                &seg,
2279                "v1",
2280                "mock",
2281                "mock-prefix",
2282                Some("English"),
2283                "Italian",
2284                "real_ns",
2285            )
2286            .expect("query ok");
2287        assert!(
2288            miss.is_none(),
2289            "legacy empty-namespace row must not satisfy a real namespace lookup"
2290        );
2291
2292        let _ = fs::remove_file(db_path);
2293    }
2294
2295    #[test]
2296    fn job_store_enables_wal_and_busy_timeout() {
2297        let db_path = temp_path("wal_busy.sqlite");
2298        let store = JobStore::open(&db_path).expect("store should open");
2299
2300        let conn = store.conn.borrow();
2301        let journal_mode: String = conn
2302            .pragma_query_value(None, "journal_mode", |row| row.get(0))
2303            .expect("pragma journal_mode should succeed");
2304        assert_eq!(
2305            journal_mode.to_lowercase(),
2306            "wal",
2307            "WAL journal mode must be enabled"
2308        );
2309
2310        let busy_timeout: i64 = conn
2311            .pragma_query_value(None, "busy_timeout", |row| row.get(0))
2312            .expect("pragma busy_timeout should succeed");
2313        assert!(
2314            busy_timeout >= 5000,
2315            "busy_timeout should be at least 5000ms, got {busy_timeout}"
2316        );
2317
2318        let wal_path = db_path.with_extension("sqlite-wal");
2319        let shm_path = db_path.with_extension("sqlite-shm");
2320        // WAL/shm may or may not exist depending on transactions, but the
2321        // journal_mode query confirms WAL is active.
2322
2323        let _ = fs::remove_file(db_path);
2324        let _ = fs::remove_file(wal_path);
2325        let _ = fs::remove_file(shm_path);
2326    }
2327
2328    #[test]
2329    fn job_store_enables_foreign_keys_on_every_connection() {
2330        let db_path = temp_path("fk.sqlite");
2331        let store = JobStore::open(&db_path).expect("store should open");
2332
2333        let conn = store.conn.borrow();
2334        let fk_enabled: i64 = conn
2335            .pragma_query_value(None, "foreign_keys", |row| row.get(0))
2336            .expect("pragma foreign_keys should succeed");
2337        assert_eq!(
2338            fk_enabled, 1,
2339            "foreign_keys pragma must be ON on every connection"
2340        );
2341
2342        let _ = fs::remove_file(db_path);
2343    }
2344
2345    #[test]
2346    fn doctor_reports_wal_sidecars_as_normal_when_integrity_check_passes() {
2347        let db_path = temp_path("doctor.sqlite");
2348        let store = JobStore::open(&db_path).expect("store should open");
2349
2350        // Perform a write to trigger WAL sidecar creation.
2351        let input_path = temp_path("input_doctor.epub");
2352        fs::write(&input_path, b"epub bytes").expect("test epub");
2353        let _job = store
2354            .create_job(CreateJob {
2355                input: &input_path,
2356                output: &temp_path("out_doctor.epub"),
2357                source_lang: Some("English"),
2358                target_lang: "Italian",
2359                provider: "mock",
2360                model: "mock-prefix",
2361                base_url: None,
2362                api_key_env: None,
2363            })
2364            .expect("job created");
2365        drop(store);
2366
2367        let doctor = run_doctor(Some(db_path.clone())).expect("doctor should run");
2368        assert!(doctor.database_exists, "database should exist");
2369        assert_eq!(
2370            doctor.journal_mode.to_lowercase(),
2371            "wal",
2372            "journal mode should be wal"
2373        );
2374        assert_eq!(doctor.integrity_check, "ok", "integrity check should pass");
2375        assert!(
2376            doctor.wal_sidecars_normal,
2377            "wal sidecars should be reported as normal"
2378        );
2379
2380        if doctor.wal_present || doctor.shm_present {
2381            assert!(
2382                !doctor.note.is_empty(),
2383                "doctor must explain WAL sidecars when they are present"
2384            );
2385        }
2386
2387        let wal_path = db_path.with_extension("sqlite-wal");
2388        let shm_path = db_path.with_extension("sqlite-shm");
2389        let _ = fs::remove_file(&db_path);
2390        let _ = fs::remove_file(input_path);
2391        let _ = fs::remove_file(wal_path);
2392        let _ = fs::remove_file(shm_path);
2393    }
2394
2395    #[test]
2396    fn checkpoint_writer_and_reader_do_not_immediately_busy_fail() {
2397        let db_path = temp_path("concurrent.sqlite");
2398        let input_path = temp_path("input_conc.epub");
2399        fs::write(&input_path, b"epub bytes").expect("test epub");
2400
2401        // Open writer store first and create a job.
2402        let store_w = JobStore::open(&db_path).expect("store_w open");
2403        let job = store_w
2404            .create_job(CreateJob {
2405                input: &input_path,
2406                output: &temp_path("out_conc.epub"),
2407                source_lang: Some("English"),
2408                target_lang: "Italian",
2409                provider: "mock",
2410                model: "mock-prefix",
2411                base_url: None,
2412                api_key_env: None,
2413            })
2414            .expect("job created");
2415        store_w
2416            .insert_segments(
2417                &job.id,
2418                &[segment("seg_conc", 0)],
2419                "v1",
2420                "mock",
2421                "mock-prefix",
2422                "ns",
2423            )
2424            .expect("segments inserted");
2425
2426        // Open a second reader store while the first is still active.
2427        let store_r = JobStore::open(&db_path).expect("store_r open");
2428        let summary = store_r
2429            .summary(&job.id)
2430            .expect("summary should load")
2431            .expect("job should exist");
2432        assert_eq!(summary.total_segments, 1);
2433
2434        let wal_path = db_path.with_extension("sqlite-wal");
2435        let shm_path = db_path.with_extension("sqlite-shm");
2436        let _ = fs::remove_file(&db_path);
2437        let _ = fs::remove_file(input_path);
2438        let _ = fs::remove_file(wal_path);
2439        let _ = fs::remove_file(shm_path);
2440    }
2441}