Skip to main content

bookforge_store/
db.rs

1use std::{
2    cell::RefCell,
3    collections::HashMap,
4    fs,
5    path::{Path, PathBuf},
6    time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use bookforge_core::{
10    Result as CoreResult,
11    entity::EntityGender,
12    glossary::{GlossaryCategory, GlossaryScopeKind, GlossaryStatus, GlossaryTerm},
13    ir::BlockId,
14    run_snapshot::RunConfigSnapshot,
15    segment::{BlockTranslation, Segment},
16};
17use rusqlite::{Connection, OptionalExtension, params, types::Type};
18use sha2::{Digest, Sha256};
19use std::str::FromStr;
20
21pub type Result<T> = std::result::Result<T, StoreError>;
22
23#[derive(Debug, thiserror::Error)]
24pub enum StoreError {
25    #[error("I/O error: {0}")]
26    Io(#[from] std::io::Error),
27
28    #[error("SQLite error: {0}")]
29    Sqlite(#[from] rusqlite::Error),
30
31    #[error("core error: {0}")]
32    Core(#[from] bookforge_core::BookforgeError),
33
34    #[error("serialization error: {0}")]
35    Serialization(String),
36}
37
38pub struct JobStore {
39    conn: RefCell<Connection>,
40    path: PathBuf,
41}
42
43#[derive(Debug, Clone)]
44pub struct JobRecord {
45    pub id: String,
46    pub input_path: PathBuf,
47    pub input_snapshot_path: Option<PathBuf>,
48    pub input_sha256: Option<String>,
49    pub output_path: PathBuf,
50    pub input_hash: String,
51    pub source_lang: Option<String>,
52    pub target_lang: String,
53    pub provider: String,
54    pub model: String,
55    pub base_url: Option<String>,
56    pub api_key_env: Option<String>,
57    pub status: String,
58    pub events_path: Option<PathBuf>,
59    pub report_json_path: Option<PathBuf>,
60    pub report_markdown_path: Option<PathBuf>,
61    pub book_id: Option<String>,
62    pub series_id: Option<String>,
63}
64
65#[derive(Debug, Clone, Default)]
66pub struct JobSummary {
67    pub id: String,
68    pub status: String,
69    pub total_segments: usize,
70    pub succeeded: usize,
71    pub failed: usize,
72    pub needs_review: usize,
73    pub retry_pending: usize,
74    pub cached: usize,
75    pub retried: usize,
76    pub input_tokens: u64,
77    pub input_cached_tokens: u64,
78    pub output_tokens: u64,
79}
80
81#[derive(Debug, Clone, Copy)]
82pub struct CreateJob<'a> {
83    pub input: &'a Path,
84    pub output: &'a Path,
85    pub source_lang: Option<&'a str>,
86    pub target_lang: &'a str,
87    pub provider: &'a str,
88    pub model: &'a str,
89    pub base_url: Option<&'a str>,
90    pub api_key_env: Option<&'a str>,
91    pub book_id: Option<&'a str>,
92    pub series_id: Option<&'a str>,
93}
94
95#[derive(Debug, Clone)]
96pub struct SegmentRecord {
97    pub id: String,
98    pub status: String,
99    pub attempts: usize,
100    pub error: Option<String>,
101    pub input_tokens: Option<u64>,
102    pub input_cached_tokens: Option<u64>,
103    pub output_tokens: Option<u64>,
104    pub tokens_estimated: bool,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct StoredBlockTranslation {
109    pub segment_id: String,
110    pub block_id: String,
111    pub text: String,
112}
113
114#[derive(Debug, Clone, PartialEq, Eq)]
115pub struct StoredSegmentTranslation {
116    pub segment_id: String,
117    pub ordinal: usize,
118    pub status: String,
119    pub error: Option<String>,
120    pub translated_text: String,
121    pub blocks: Vec<BlockTranslation>,
122}
123
124#[derive(Debug, Clone, PartialEq, Eq)]
125pub struct CachedTranslation {
126    pub translated_text: String,
127    pub blocks: Vec<BlockTranslation>,
128}
129
130#[derive(Debug, Clone, Copy)]
131pub struct SaveTranslation<'a> {
132    pub job_id: &'a str,
133    pub segment_id: &'a str,
134    pub translated_text: &'a str,
135    pub blocks: &'a [BlockTranslation],
136    pub provider: &'a str,
137    pub model: &'a str,
138    pub prompt_version: &'a str,
139    pub input_tokens: Option<u64>,
140    pub input_cached_tokens: Option<u64>,
141    pub output_tokens: Option<u64>,
142    pub tokens_estimated: bool,
143}
144
145#[derive(Debug, Clone, Copy)]
146pub struct SaveNeedsReview<'a> {
147    pub job_id: &'a str,
148    pub segment_id: &'a str,
149    pub preserved_text: &'a str,
150    pub blocks: &'a [BlockTranslation],
151    pub provider: &'a str,
152    pub model: &'a str,
153    pub prompt_version: &'a str,
154    pub error: &'a str,
155    pub input_tokens: Option<u64>,
156    pub input_cached_tokens: Option<u64>,
157    pub output_tokens: Option<u64>,
158    pub tokens_estimated: bool,
159}
160
161#[derive(Debug, Clone, Copy)]
162pub struct SaveCachedTranslation<'a> {
163    pub job_id: &'a str,
164    pub segment_id: &'a str,
165    pub translated_text: &'a str,
166    pub blocks: &'a [BlockTranslation],
167    pub provider: &'a str,
168    pub model: &'a str,
169    pub prompt_version: &'a str,
170}
171
172#[derive(Debug, Clone, Copy)]
173pub struct CacheLookupRequest<'a> {
174    pub prompt_version: &'a str,
175    pub provider: &'a str,
176    pub model: &'a str,
177    pub source_lang: Option<&'a str>,
178    pub target_lang: &'a str,
179    pub cache_namespace: &'a str,
180}
181
182#[derive(Debug, Clone, Copy)]
183pub struct NewSegmentFlag<'a> {
184    pub job_id: &'a str,
185    pub segment_id: &'a str,
186    pub kind: &'a str,
187    pub note: Option<&'a str>,
188    pub suggested_source: Option<&'a str>,
189    pub suggested_target: Option<&'a str>,
190    pub consumed: bool,
191}
192
193#[derive(Debug, Clone, Copy, Default)]
194pub struct GlossaryFilter<'a> {
195    pub scope_kind: Option<GlossaryScopeKind>,
196    pub scope_id: Option<&'a str>,
197    pub source_language: Option<&'a str>,
198    pub target_language: Option<&'a str>,
199    pub active_only: bool,
200}
201
202#[derive(Debug, Clone, Copy)]
203pub struct NewGlossaryCandidate<'a> {
204    pub source_text: &'a str,
205    pub category: GlossaryCategory,
206    pub source_count: usize,
207}
208
209#[derive(Debug, Clone, Copy, Default, PartialEq, Eq)]
210pub struct GlossaryCandidateUpsertResult {
211    pub inserted: usize,
212    pub updated: usize,
213    pub skipped: usize,
214}
215
216#[derive(Debug, Clone, PartialEq, Eq)]
217pub struct StoredGlossaryCandidate {
218    pub id: i64,
219    pub source_text: String,
220    pub target_text: Option<String>,
221    pub category: GlossaryCategory,
222    pub notes: Option<String>,
223    pub case_sensitive: bool,
224    pub always_active: bool,
225    pub status: GlossaryStatus,
226    pub source_language: String,
227    pub target_language: String,
228    pub source_count: usize,
229}
230
231#[derive(Debug, Clone, Copy)]
232pub struct NewStyleSheet<'a> {
233    pub scope_kind: GlossaryScopeKind,
234    pub scope_id: Option<&'a str>,
235    pub target_language: &'a str,
236    pub content_toml: &'a str,
237    pub fingerprint: &'a str,
238}
239
240#[derive(Debug, Clone, PartialEq, Eq)]
241pub struct StoredStyleSheet {
242    pub id: i64,
243    pub scope_kind: GlossaryScopeKind,
244    pub scope_id: Option<String>,
245    pub target_language: String,
246    pub content_toml: String,
247    pub fingerprint: String,
248}
249
250#[derive(Debug, Clone, Copy)]
251pub struct NewEntity<'a> {
252    pub scope_kind: GlossaryScopeKind,
253    pub scope_id: Option<&'a str>,
254    pub source_name: &'a str,
255    pub target_name: &'a str,
256    pub gender_target: Option<EntityGender>,
257    pub role: Option<&'a str>,
258    pub notes: Option<&'a str>,
259    pub source_language: &'a str,
260    pub target_language: &'a str,
261}
262
263#[derive(Debug, Clone, PartialEq, Eq)]
264pub struct StoredEntity {
265    pub id: i64,
266    pub scope_kind: GlossaryScopeKind,
267    pub scope_id: Option<String>,
268    pub source_name: String,
269    pub target_name: String,
270    pub gender_target: Option<EntityGender>,
271    pub role: Option<String>,
272    pub notes: Option<String>,
273    pub source_language: String,
274    pub target_language: String,
275}
276
277#[derive(Debug, Clone)]
278pub struct StorageDoctor {
279    pub database_path: PathBuf,
280    pub database_exists: bool,
281    pub wal_present: bool,
282    pub shm_present: bool,
283    pub journal_mode: String,
284    pub integrity_check: String,
285    pub wal_sidecars_normal: bool,
286    pub note: String,
287}
288
289pub fn run_doctor(db_path: Option<PathBuf>) -> Result<StorageDoctor> {
290    let path = db_path.unwrap_or_else(|| PathBuf::from(".bookforge/jobs.sqlite"));
291    let database_exists = path.exists();
292    let wal_path = path.with_extension("sqlite-wal");
293    let shm_path = path.with_extension("sqlite-shm");
294    let wal_present = wal_path.exists();
295    let shm_present = shm_path.exists();
296
297    let (journal_mode, integrity_check, wal_sidecars_normal, note) = if database_exists {
298        let conn = Connection::open(&path)?;
299        let journal_mode: String = conn
300            .pragma_query_value(None, "journal_mode", |row| row.get(0))
301            .unwrap_or_else(|_| "unknown".to_string());
302        let integrity_check: String = conn
303            .pragma_query_value(None, "integrity_check", |row| row.get(0))
304            .unwrap_or_else(|_| "error".to_string());
305        let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
306
307        let wal_sidecars_normal = if wal_present || shm_present {
308            integrity_check == "ok"
309        } else {
310            true
311        };
312
313        let note = if wal_present || shm_present {
314            "WAL sidecar files are normal. SQLite will recover them automatically. \
315             Do not delete them manually while BookForge is running."
316                .to_string()
317        } else {
318            String::new()
319        };
320
321        (journal_mode, integrity_check, wal_sidecars_normal, note)
322    } else {
323        ("unknown".to_string(), String::new(), true, String::new())
324    };
325
326    Ok(StorageDoctor {
327        database_path: path,
328        database_exists,
329        wal_present,
330        shm_present,
331        journal_mode,
332        integrity_check,
333        wal_sidecars_normal,
334        note,
335    })
336}
337
338impl JobStore {
339    pub fn open_default() -> Result<Self> {
340        Self::open(".bookforge/jobs.sqlite")
341    }
342
343    pub fn open(path: impl Into<PathBuf>) -> Result<Self> {
344        let path = path.into();
345        if let Some(parent) = path.parent() {
346            fs::create_dir_all(parent)?;
347        }
348        let conn = Connection::open(&path)?;
349
350        conn.busy_timeout(Duration::from_secs(5))?;
351        conn.pragma_update(None, "journal_mode", "WAL")?;
352        conn.pragma_update(None, "synchronous", "NORMAL")?;
353        conn.pragma_update(None, "foreign_keys", "ON")?;
354
355        let store = Self {
356            conn: RefCell::new(conn),
357            path,
358        };
359        store.migrate()?;
360        Ok(store)
361    }
362
363    pub fn path(&self) -> &Path {
364        &self.path
365    }
366
367    pub fn create_job(&self, request: CreateJob<'_>) -> Result<JobRecord> {
368        let input_hash = file_hash(request.input)?;
369        let id = format!("job_{}_{}", unix_timestamp_nanos(), &input_hash[..12]);
370        let now = timestamp_string();
371        let input_path = request.input.to_path_buf();
372        let output_path = request.output.to_path_buf();
373        let conn = self.conn.borrow();
374        conn.execute(
375            "INSERT INTO jobs
376             (id, input_path, output_path, input_hash, source_lang, target_lang, provider, model, base_url, api_key_env, book_id, series_id, status, created_at, updated_at)
377             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, 'running', ?13, ?13)",
378            params![
379                id,
380                input_path.to_string_lossy(),
381                output_path.to_string_lossy(),
382                input_hash,
383                request.source_lang,
384                request.target_lang,
385                request.provider,
386                request.model,
387                request.base_url,
388                request.api_key_env,
389                request.book_id,
390                request.series_id,
391                now,
392            ],
393        )?;
394
395        Ok(JobRecord {
396            id,
397            input_path,
398            input_snapshot_path: None,
399            input_sha256: None,
400            output_path,
401            input_hash,
402            source_lang: request.source_lang.map(ToOwned::to_owned),
403            target_lang: request.target_lang.to_string(),
404            provider: request.provider.to_string(),
405            model: request.model.to_string(),
406            base_url: request.base_url.map(ToOwned::to_owned),
407            api_key_env: request.api_key_env.map(ToOwned::to_owned),
408            status: "running".to_string(),
409            events_path: None,
410            report_json_path: None,
411            report_markdown_path: None,
412            book_id: request.book_id.map(ToOwned::to_owned),
413            series_id: request.series_id.map(ToOwned::to_owned),
414        })
415    }
416
417    pub fn update_job_config_snapshot(
418        &self,
419        job_id: &str,
420        snapshot: &RunConfigSnapshot,
421    ) -> Result<()> {
422        let json = serde_json::to_string(snapshot)
423            .map_err(|e| StoreError::Serialization(e.to_string()))?;
424        let conn = self.conn.borrow();
425        conn.execute(
426            "UPDATE jobs
427             SET config_json = ?1,
428                 events_path = ?2,
429                 report_json_path = ?3,
430                 report_markdown_path = ?4,
431                 input_snapshot_path = ?5,
432                 input_sha256 = ?6,
433                 updated_at = ?7
434             WHERE id = ?8",
435            params![
436                json,
437                snapshot
438                    .events_path
439                    .as_ref()
440                    .map(|path| path.to_string_lossy().to_string()),
441                snapshot
442                    .report_json_path
443                    .as_ref()
444                    .map(|path| path.to_string_lossy().to_string()),
445                snapshot
446                    .report_markdown_path
447                    .as_ref()
448                    .map(|path| path.to_string_lossy().to_string()),
449                snapshot
450                    .input_snapshot_path
451                    .as_ref()
452                    .map(|path| path.to_string_lossy().to_string()),
453                snapshot.input_sha256.as_deref(),
454                timestamp_string(),
455                job_id,
456            ],
457        )?;
458        Ok(())
459    }
460
461    pub fn update_job_input_snapshot(
462        &self,
463        job_id: &str,
464        snapshot_path: &Path,
465        input_sha256: &str,
466    ) -> Result<()> {
467        let conn = self.conn.borrow();
468        conn.execute(
469            "UPDATE jobs
470             SET input_snapshot_path = ?1,
471                 input_sha256 = ?2,
472                 updated_at = ?3
473             WHERE id = ?4",
474            params![
475                snapshot_path.to_string_lossy(),
476                input_sha256,
477                timestamp_string(),
478                job_id
479            ],
480        )?;
481        Ok(())
482    }
483
484    pub fn load_job_config_snapshot(&self, job_id: &str) -> Result<Option<RunConfigSnapshot>> {
485        let conn = self.conn.borrow();
486        let Some(json) = conn
487            .query_row(
488                "SELECT config_json FROM jobs WHERE id = ?1",
489                params![job_id],
490                |row| row.get::<_, Option<String>>(0),
491            )
492            .optional()?
493            .flatten()
494        else {
495            return Ok(None);
496        };
497
498        serde_json::from_str(&json)
499            .map(Some)
500            .map_err(|e| StoreError::Serialization(e.to_string()))
501    }
502
503    pub fn update_job_event_path(&self, job_id: &str, path: &Path) -> Result<()> {
504        let conn = self.conn.borrow();
505        conn.execute(
506            "UPDATE jobs SET events_path = ?1, updated_at = ?2 WHERE id = ?3",
507            params![path.to_string_lossy(), timestamp_string(), job_id],
508        )?;
509        Ok(())
510    }
511
512    pub fn update_job_report_paths(
513        &self,
514        job_id: &str,
515        json_path: &Path,
516        markdown_path: &Path,
517    ) -> Result<()> {
518        let conn = self.conn.borrow();
519        conn.execute(
520            "UPDATE jobs
521             SET report_json_path = ?1, report_markdown_path = ?2, updated_at = ?3
522             WHERE id = ?4",
523            params![
524                json_path.to_string_lossy(),
525                markdown_path.to_string_lossy(),
526                timestamp_string(),
527                job_id
528            ],
529        )?;
530        Ok(())
531    }
532
533    pub fn update_job_output_path(&self, job_id: &str, path: &Path) -> Result<()> {
534        let conn = self.conn.borrow();
535        conn.execute(
536            "UPDATE jobs SET output_path = ?1, updated_at = ?2 WHERE id = ?3",
537            params![path.to_string_lossy(), timestamp_string(), job_id],
538        )?;
539        Ok(())
540    }
541
542    pub fn insert_segments(
543        &self,
544        job_id: &str,
545        segments: &[Segment],
546        prompt_version: &str,
547        provider: &str,
548        model: &str,
549        cache_namespace: &str,
550    ) -> Result<()> {
551        let mut conn = self.conn.borrow_mut();
552        let tx = conn.transaction()?;
553        for segment in segments {
554            tx.execute(
555                "INSERT OR IGNORE INTO segments
556                 (id, job_id, section_id, ordinal, source_hash, prompt_version, provider, model, status, attempts, cache_namespace)
557                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'queued', 0, ?9)",
558                params![
559                    segment.id.0,
560                    job_id,
561                    segment.section_id.0,
562                    segment.ordinal as i64,
563                    segment.checksum,
564                    prompt_version,
565                    provider,
566                    model,
567                    cache_namespace,
568                ],
569            )?;
570        }
571        tx.commit()?;
572        Ok(())
573    }
574
575    pub fn save_translation(&self, request: SaveTranslation<'_>) -> Result<()> {
576        let now = timestamp_string();
577        let translated_hash = stable_hash(request.translated_text);
578        {
579            let conn = self.conn.borrow();
580            conn.execute(
581                "INSERT OR REPLACE INTO translations
582                 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
583                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
584                params![
585                    request.segment_id,
586                    request.job_id,
587                    request.translated_text,
588                    request.provider,
589                    request.model,
590                    request.prompt_version,
591                    now
592                ],
593            )?;
594            replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
595            conn.execute(
596                "UPDATE segments
597                 SET status = 'succeeded',
598                     attempts = attempts + 1,
599                     tokens_input = ?1,
600                     tokens_input_cached = ?2,
601                     tokens_output = ?3,
602                     tokens_estimated = ?4,
603                     translated_hash = ?5,
604                     error = NULL
605                 WHERE job_id = ?6 AND id = ?7",
606                params![
607                    request.input_tokens.map(|value| value as i64),
608                    request.input_cached_tokens.map(|value| value as i64),
609                    request.output_tokens.map(|value| value as i64),
610                    if request.tokens_estimated {
611                        1_i64
612                    } else {
613                        0_i64
614                    },
615                    translated_hash,
616                    request.job_id,
617                    request.segment_id,
618                ],
619            )?;
620        }
621        self.touch_job(request.job_id, "running")?;
622        Ok(())
623    }
624
625    pub fn save_needs_review(&self, request: SaveNeedsReview<'_>) -> Result<()> {
626        let now = timestamp_string();
627        let translated_hash = stable_hash(request.preserved_text);
628        {
629            let conn = self.conn.borrow();
630            conn.execute(
631                "INSERT OR REPLACE INTO translations
632                 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
633                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
634                params![
635                    request.segment_id,
636                    request.job_id,
637                    request.preserved_text,
638                    request.provider,
639                    request.model,
640                    request.prompt_version,
641                    now
642                ],
643            )?;
644            replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
645            conn.execute(
646                "UPDATE segments
647                 SET status = 'needs_review',
648                     attempts = attempts + 1,
649                     tokens_input = ?1,
650                     tokens_input_cached = ?2,
651                     tokens_output = ?3,
652                     tokens_estimated = ?4,
653                     translated_hash = ?5,
654                     error = ?6
655                 WHERE job_id = ?7 AND id = ?8",
656                params![
657                    request.input_tokens.map(|value| value as i64),
658                    request.input_cached_tokens.map(|value| value as i64),
659                    request.output_tokens.map(|value| value as i64),
660                    if request.tokens_estimated {
661                        1_i64
662                    } else {
663                        0_i64
664                    },
665                    translated_hash,
666                    request.error,
667                    request.job_id,
668                    request.segment_id
669                ],
670            )?;
671        }
672        self.touch_job(request.job_id, "needs_review")?;
673        Ok(())
674    }
675
676    pub fn save_cached_translation(&self, request: SaveCachedTranslation<'_>) -> Result<()> {
677        let now = timestamp_string();
678        let translated_hash = stable_hash(request.translated_text);
679        {
680            let conn = self.conn.borrow();
681            conn.execute(
682                "INSERT OR REPLACE INTO translations
683                 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
684                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
685                params![
686                    request.segment_id,
687                    request.job_id,
688                    request.translated_text,
689                    request.provider,
690                    request.model,
691                    request.prompt_version,
692                    now
693                ],
694            )?;
695            replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
696            conn.execute(
697                "UPDATE segments
698                 SET status = 'skipped_cached',
699                     tokens_input = NULL,
700                     tokens_input_cached = NULL,
701                     tokens_output = NULL,
702                     tokens_estimated = 0,
703                     translated_hash = ?1,
704                     error = NULL
705                 WHERE job_id = ?2 AND id = ?3",
706                params![translated_hash, request.job_id, request.segment_id],
707            )?;
708        }
709        self.touch_job(request.job_id, "running")?;
710        Ok(())
711    }
712
713    pub fn mark_job_complete(&self, job_id: &str) -> Result<()> {
714        self.touch_job(job_id, "succeeded")
715    }
716
717    pub fn mark_job_running(&self, job_id: &str) -> Result<()> {
718        self.touch_job(job_id, "running")
719    }
720
721    pub fn mark_job_succeeded(&self, job_id: &str) -> Result<()> {
722        self.mark_job_complete(job_id)
723    }
724
725    pub fn mark_job_needs_review(&self, job_id: &str) -> Result<()> {
726        self.touch_job(job_id, "needs_review")
727    }
728
729    pub fn mark_job_interrupted(&self, job_id: &str) -> Result<()> {
730        self.touch_job(job_id, "interrupted")
731    }
732
733    pub fn mark_job_failed(&self, job_id: &str) -> Result<()> {
734        self.touch_job(job_id, "failed")
735    }
736
737    pub fn mark_segment_failed(&self, job_id: &str, segment_id: &str, error: &str) -> Result<()> {
738        {
739            let conn = self.conn.borrow();
740            conn.execute(
741                "UPDATE segments SET status = 'failed', attempts = attempts + 1, error = ?1 WHERE job_id = ?2 AND id = ?3",
742                params![error, job_id, segment_id],
743            )?;
744        }
745        self.touch_job(job_id, "failed")?;
746        Ok(())
747    }
748
749    pub fn mark_segment_failed_if_unfinished(
750        &self,
751        job_id: &str,
752        segment_id: &str,
753        error: &str,
754    ) -> Result<()> {
755        {
756            let conn = self.conn.borrow();
757            conn.execute(
758                "UPDATE segments
759                 SET status = 'failed', attempts = attempts + 1, error = ?1
760                 WHERE job_id = ?2
761                   AND id = ?3
762                   AND status NOT IN ('succeeded', 'skipped_cached', 'needs_review')",
763                params![error, job_id, segment_id],
764            )?;
765        }
766        self.touch_job(job_id, "failed")?;
767        Ok(())
768    }
769
770    pub fn mark_unfinished_segments_failed(
771        &self,
772        job_id: &str,
773        candidate_segment_ids: &[String],
774        error: &str,
775    ) -> Result<usize> {
776        const SQLITE_IN_CHUNK_SIZE: usize = 900;
777        let mut updated = 0;
778
779        for chunk in candidate_segment_ids.chunks(SQLITE_IN_CHUNK_SIZE) {
780            if chunk.is_empty() {
781                continue;
782            }
783
784            let placeholders = std::iter::repeat_n("?", chunk.len())
785                .collect::<Vec<_>>()
786                .join(", ");
787            let sql = format!(
788                "UPDATE segments
789                 SET status = 'failed', attempts = attempts + 1, error = ?
790                 WHERE job_id = ?
791                   AND id IN ({placeholders})
792                   AND status NOT IN ('succeeded', 'skipped_cached', 'needs_review')"
793            );
794
795            let conn = self.conn.borrow();
796            let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 2);
797            params.push(&error);
798            params.push(&job_id);
799            for id in chunk {
800                params.push(id);
801            }
802            updated += conn.execute(&sql, params.as_slice())?;
803        }
804
805        if updated > 0 {
806            self.touch_job(job_id, "failed")?;
807        }
808        Ok(updated)
809    }
810
811    pub fn get_job(&self, job_id: &str) -> Result<Option<JobRecord>> {
812        let conn = self.conn.borrow();
813        conn.query_row(
814            "SELECT id, input_path, input_snapshot_path, input_sha256, output_path, input_hash, source_lang, target_lang, provider, model, base_url, api_key_env, status,
815                    events_path, report_json_path, report_markdown_path, book_id, series_id
816             FROM jobs WHERE id = ?1",
817            params![job_id],
818            |row| {
819                Ok(JobRecord {
820                    id: row.get(0)?,
821                    input_path: PathBuf::from(row.get::<_, String>(1)?),
822                    input_snapshot_path: row.get::<_, Option<String>>(2)?.map(PathBuf::from),
823                    input_sha256: row.get(3)?,
824                    output_path: PathBuf::from(row.get::<_, String>(4)?),
825                    input_hash: row.get(5)?,
826                    source_lang: row.get(6)?,
827                    target_lang: row.get(7)?,
828                    provider: row.get(8)?,
829                    model: row.get(9)?,
830                    base_url: row.get(10)?,
831                    api_key_env: row.get(11)?,
832                    status: row.get(12)?,
833                    events_path: row.get::<_, Option<String>>(13)?.map(PathBuf::from),
834                    report_json_path: row.get::<_, Option<String>>(14)?.map(PathBuf::from),
835                    report_markdown_path: row.get::<_, Option<String>>(15)?.map(PathBuf::from),
836                    book_id: row.get(16)?,
837                    series_id: row.get(17)?,
838                })
839            },
840        )
841        .optional()
842        .map_err(StoreError::from)
843    }
844
845    pub fn summary(&self, job_id: &str) -> Result<Option<JobSummary>> {
846        let Some(job) = self.get_job(job_id)? else {
847            return Ok(None);
848        };
849        let conn = self.conn.borrow();
850        let mut summary = JobSummary {
851            id: job.id,
852            status: job.status,
853            ..JobSummary::default()
854        };
855
856        let mut stmt = conn.prepare(
857            "SELECT status,
858                    COUNT(*),
859                    COALESCE(SUM(COALESCE(tokens_input, input_tokens)), 0),
860                    COALESCE(SUM(tokens_input_cached), 0),
861                    COALESCE(SUM(COALESCE(tokens_output, output_tokens)), 0)
862             FROM segments WHERE job_id = ?1 GROUP BY status",
863        )?;
864        let rows = stmt.query_map(params![job_id], |row| {
865            Ok((
866                row.get::<_, String>(0)?,
867                row.get::<_, i64>(1)?,
868                row.get::<_, i64>(2)?,
869                row.get::<_, i64>(3)?,
870                row.get::<_, i64>(4)?,
871            ))
872        })?;
873
874        for row in rows {
875            let (status, count, input_tokens, input_cached_tokens, output_tokens) = row?;
876            let count = count as usize;
877            summary.total_segments += count;
878            summary.input_tokens += input_tokens as u64;
879            summary.input_cached_tokens += input_cached_tokens as u64;
880            summary.output_tokens += output_tokens as u64;
881            match status.as_str() {
882                "succeeded" => summary.succeeded += count,
883                "failed" => summary.failed += count,
884                "needs_review" => summary.needs_review += count,
885                "retry_pending" => summary.retry_pending += count,
886                "skipped_cached" => summary.cached += count,
887                _ => {}
888            }
889        }
890
891        summary.retried = conn.query_row(
892            "SELECT COUNT(*) FROM segments WHERE job_id = ?1 AND attempts > 1",
893            params![job_id],
894            |row| row.get::<_, i64>(0),
895        )? as usize;
896
897        Ok(Some(summary))
898    }
899
900    pub fn retry_segments(&self, job_id: &str, scope: RetryScope) -> Result<usize> {
901        let where_status = match scope {
902            RetryScope::Failed => "status = 'failed'",
903            RetryScope::NeedsReview => "status = 'needs_review'",
904            RetryScope::All => "status IN ('failed', 'needs_review')",
905        };
906        let sql = format!(
907            "UPDATE segments SET status = 'retry_pending', error = NULL WHERE job_id = ?1 AND {where_status}"
908        );
909        let count = {
910            let conn = self.conn.borrow();
911            conn.execute(&sql, params![job_id])?
912        };
913        self.touch_job(job_id, "retry_pending")?;
914        Ok(count)
915    }
916
917    pub fn insert_segment_flags(&self, flags: &[NewSegmentFlag<'_>]) -> Result<usize> {
918        let mut conn = self.conn.borrow_mut();
919        let tx = conn.transaction()?;
920        let ingested_at = timestamp_string();
921        let mut inserted = 0usize;
922        for flag in flags {
923            inserted += tx.execute(
924                "INSERT INTO segment_flags
925                 (job_id, segment_id, kind, note, suggested_source, suggested_target, ingested_at, consumed)
926                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
927                params![
928                    flag.job_id,
929                    flag.segment_id,
930                    flag.kind,
931                    flag.note,
932                    flag.suggested_source,
933                    flag.suggested_target,
934                    ingested_at,
935                    if flag.consumed { 1_i64 } else { 0_i64 },
936                ],
937            )?;
938        }
939        tx.commit()?;
940        Ok(inserted)
941    }
942
943    pub fn mark_segments_needs_review(
944        &self,
945        job_id: &str,
946        segment_ids: &[String],
947        reason: &str,
948    ) -> Result<usize> {
949        const SQLITE_IN_CHUNK_SIZE: usize = 900;
950        let mut updated = 0usize;
951        for chunk in segment_ids.chunks(SQLITE_IN_CHUNK_SIZE) {
952            if chunk.is_empty() {
953                continue;
954            }
955            let placeholders = std::iter::repeat_n("?", chunk.len())
956                .collect::<Vec<_>>()
957                .join(", ");
958            let sql = format!(
959                "UPDATE segments
960                 SET status = 'needs_review',
961                     error = ?
962                 WHERE job_id = ?
963                   AND id IN ({placeholders})"
964            );
965            let conn = self.conn.borrow();
966            let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 2);
967            params.push(&reason);
968            params.push(&job_id);
969            for id in chunk {
970                params.push(id);
971            }
972            updated += conn.execute(&sql, params.as_slice())?;
973        }
974        if updated > 0 {
975            self.touch_job(job_id, "needs_review")?;
976        }
977        Ok(updated)
978    }
979
980    pub fn segment_flag_count(&self, job_id: &str) -> Result<usize> {
981        let conn = self.conn.borrow();
982        let count = conn.query_row(
983            "SELECT COUNT(*) FROM segment_flags WHERE job_id = ?1",
984            params![job_id],
985            |row| row.get::<_, i64>(0),
986        )?;
987        Ok(count as usize)
988    }
989
990    pub fn upsert_glossary_terms(&self, terms: &[GlossaryTerm]) -> Result<usize> {
991        let mut conn = self.conn.borrow_mut();
992        let tx = conn.transaction()?;
993        let now = timestamp_string();
994        let mut changed = 0usize;
995        for term in terms {
996            let existing_id = tx
997                .query_row(
998                    "SELECT id FROM glossary_terms
999                     WHERE scope_kind = ?1
1000                       AND ((?2 IS NULL AND scope_id IS NULL) OR scope_id = ?2)
1001                       AND source_text = ?3
1002                       AND source_language = ?4
1003                       AND target_language = ?5",
1004                    params![
1005                        term.scope_kind.as_str(),
1006                        term.scope_id.as_deref(),
1007                        term.source_text,
1008                        term.source_language,
1009                        term.target_language,
1010                    ],
1011                    |row| row.get::<_, i64>(0),
1012                )
1013                .optional()?;
1014            if let Some(id) = existing_id {
1015                changed += tx.execute(
1016                    "UPDATE glossary_terms
1017                     SET target_text = ?1,
1018                         category = ?2,
1019                         notes = ?3,
1020                         case_sensitive = ?4,
1021                         always_active = ?5,
1022                         status = ?6,
1023                         source_count = ?7,
1024                         updated_at = ?8
1025                     WHERE id = ?9",
1026                    params![
1027                        term.target_text,
1028                        term.category.as_str(),
1029                        term.notes.as_deref(),
1030                        if term.case_sensitive { 1_i64 } else { 0_i64 },
1031                        if term.always_active { 1_i64 } else { 0_i64 },
1032                        term.status.as_str(),
1033                        term.source_count as i64,
1034                        now,
1035                        id,
1036                    ],
1037                )?;
1038            } else {
1039                changed += tx.execute(
1040                    "INSERT INTO glossary_terms
1041                     (scope_kind, scope_id, source_text, target_text, category, notes,
1042                      case_sensitive, always_active, status, source_language, target_language,
1043                      source_count, created_at, updated_at)
1044                     VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?11, ?12, ?13, ?13)",
1045                    params![
1046                        term.scope_kind.as_str(),
1047                        term.scope_id.as_deref(),
1048                        term.source_text,
1049                        term.target_text,
1050                        term.category.as_str(),
1051                        term.notes.as_deref(),
1052                        if term.case_sensitive { 1_i64 } else { 0_i64 },
1053                        if term.always_active { 1_i64 } else { 0_i64 },
1054                        term.status.as_str(),
1055                        term.source_language,
1056                        term.target_language,
1057                        term.source_count as i64,
1058                        now,
1059                    ],
1060                )?;
1061            }
1062        }
1063        tx.commit()?;
1064        Ok(changed)
1065    }
1066
1067    pub fn add_glossary_term(&self, term: &GlossaryTerm) -> Result<i64> {
1068        self.upsert_glossary_terms(std::slice::from_ref(term))?;
1069        let conn = self.conn.borrow();
1070        let id = conn.query_row(
1071            "SELECT id FROM glossary_terms
1072             WHERE scope_kind = ?1
1073               AND ((?2 IS NULL AND scope_id IS NULL) OR scope_id = ?2)
1074               AND source_text = ?3
1075               AND source_language = ?4
1076               AND target_language = ?5",
1077            params![
1078                term.scope_kind.as_str(),
1079                term.scope_id.as_deref(),
1080                term.source_text,
1081                term.source_language,
1082                term.target_language,
1083            ],
1084            |row| row.get::<_, i64>(0),
1085        )?;
1086        Ok(id)
1087    }
1088
1089    pub fn upsert_glossary_candidates(
1090        &self,
1091        book_id: &str,
1092        source_language: &str,
1093        target_language: &str,
1094        candidates: &[NewGlossaryCandidate<'_>],
1095    ) -> Result<GlossaryCandidateUpsertResult> {
1096        let mut conn = self.conn.borrow_mut();
1097        let tx = conn.transaction()?;
1098        let now = timestamp_string();
1099        let mut result = GlossaryCandidateUpsertResult::default();
1100
1101        for candidate in candidates {
1102            let existing = tx
1103                .query_row(
1104                    "SELECT id, status FROM glossary_terms
1105                     WHERE scope_kind = 'book'
1106                       AND scope_id = ?1
1107                       AND source_text = ?2
1108                       AND source_language = ?3
1109                       AND target_language = ?4",
1110                    params![
1111                        book_id,
1112                        candidate.source_text,
1113                        source_language,
1114                        target_language
1115                    ],
1116                    |row| Ok((row.get::<_, i64>(0)?, row.get::<_, String>(1)?)),
1117                )
1118                .optional()?;
1119
1120            match existing {
1121                Some((id, status)) if status == GlossaryStatus::AutoCandidate.as_str() => {
1122                    tx.execute(
1123                        "UPDATE glossary_terms
1124                         SET category = ?1,
1125                             source_count = ?2,
1126                             updated_at = ?3
1127                         WHERE id = ?4",
1128                        params![
1129                            candidate.category.as_str(),
1130                            candidate.source_count as i64,
1131                            now,
1132                            id,
1133                        ],
1134                    )?;
1135                    result.updated += 1;
1136                }
1137                Some(_) => {
1138                    result.skipped += 1;
1139                }
1140                None => {
1141                    tx.execute(
1142                        "INSERT INTO glossary_terms
1143                         (scope_kind, scope_id, source_text, target_text, category, notes,
1144                          case_sensitive, always_active, status, source_language, target_language,
1145                          source_count, created_at, updated_at)
1146                         VALUES ('book', ?1, ?2, NULL, ?3, NULL, 1, 0, 'auto_candidate',
1147                                 ?4, ?5, ?6, ?7, ?7)",
1148                        params![
1149                            book_id,
1150                            candidate.source_text,
1151                            candidate.category.as_str(),
1152                            source_language,
1153                            target_language,
1154                            candidate.source_count as i64,
1155                            now,
1156                        ],
1157                    )?;
1158                    result.inserted += 1;
1159                }
1160            }
1161        }
1162
1163        tx.commit()?;
1164        Ok(result)
1165    }
1166
1167    pub fn list_glossary_candidate_language_pairs(
1168        &self,
1169        book_id: &str,
1170    ) -> Result<Vec<(String, String)>> {
1171        let conn = self.conn.borrow();
1172        let mut stmt = conn.prepare(
1173            "SELECT DISTINCT source_language, target_language
1174             FROM glossary_terms
1175             WHERE scope_kind = 'book'
1176               AND scope_id = ?1
1177               AND status = 'auto_candidate'
1178             ORDER BY source_language, target_language",
1179        )?;
1180        let rows = stmt.query_map(params![book_id], |row| {
1181            Ok((row.get::<_, String>(0)?, row.get::<_, String>(1)?))
1182        })?;
1183        rows.collect::<std::result::Result<Vec<_>, _>>()
1184            .map_err(StoreError::from)
1185    }
1186
1187    pub fn list_glossary_candidates(
1188        &self,
1189        book_id: &str,
1190        source_language: &str,
1191        target_language: &str,
1192    ) -> Result<Vec<StoredGlossaryCandidate>> {
1193        let conn = self.conn.borrow();
1194        let mut stmt = conn.prepare(
1195            "SELECT id, source_text, target_text, category, notes, case_sensitive,
1196                    always_active, status, source_language, target_language, source_count
1197             FROM glossary_terms
1198             WHERE scope_kind = 'book'
1199               AND scope_id = ?1
1200               AND source_language = ?2
1201               AND target_language = ?3
1202               AND status = 'auto_candidate'
1203             ORDER BY source_count DESC, source_text",
1204        )?;
1205        let rows = stmt.query_map(
1206            params![book_id, source_language, target_language],
1207            glossary_candidate_from_row,
1208        )?;
1209        rows.collect::<std::result::Result<Vec<_>, _>>()
1210            .map_err(StoreError::from)
1211    }
1212
1213    pub fn accept_glossary_candidate(&self, id: i64, target_text: Option<&str>) -> Result<bool> {
1214        let conn = self.conn.borrow();
1215        let Some((source_text, existing_target)) = conn
1216            .query_row(
1217                "SELECT source_text, target_text
1218                 FROM glossary_terms
1219                 WHERE id = ?1 AND status = 'auto_candidate'",
1220                params![id],
1221                |row| Ok((row.get::<_, String>(0)?, row.get::<_, Option<String>>(1)?)),
1222            )
1223            .optional()?
1224        else {
1225            return Ok(false);
1226        };
1227        let target = target_text
1228            .filter(|value| !value.trim().is_empty())
1229            .map(ToOwned::to_owned)
1230            .or_else(|| existing_target.filter(|value| !value.trim().is_empty()))
1231            .unwrap_or(source_text);
1232        let updated = conn.execute(
1233            "UPDATE glossary_terms
1234             SET target_text = ?1,
1235                 status = 'accepted',
1236                 updated_at = ?2
1237             WHERE id = ?3 AND status = 'auto_candidate'",
1238            params![target, timestamp_string(), id],
1239        )?;
1240        Ok(updated > 0)
1241    }
1242
1243    pub fn reject_glossary_candidate(&self, id: i64) -> Result<bool> {
1244        let conn = self.conn.borrow();
1245        let updated = conn.execute(
1246            "UPDATE glossary_terms
1247             SET status = 'rejected',
1248                 updated_at = ?1
1249             WHERE id = ?2 AND status = 'auto_candidate'",
1250            params![timestamp_string(), id],
1251        )?;
1252        Ok(updated > 0)
1253    }
1254
1255    pub fn list_glossary_terms(&self, filter: GlossaryFilter<'_>) -> Result<Vec<GlossaryTerm>> {
1256        let conn = self.conn.borrow();
1257        let mut sql = String::from(
1258            "SELECT id, scope_kind, scope_id, source_text, COALESCE(target_text, ''), category, notes,
1259                    case_sensitive, always_active, status, source_language, target_language,
1260                    source_count
1261             FROM glossary_terms
1262             WHERE 1 = 1",
1263        );
1264        let mut values: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1265        if let Some(scope_kind) = filter.scope_kind {
1266            sql.push_str(" AND scope_kind = ?");
1267            values.push(Box::new(scope_kind.as_str().to_string()));
1268        }
1269        if let Some(scope_id) = filter.scope_id {
1270            sql.push_str(" AND scope_id = ?");
1271            values.push(Box::new(scope_id.to_string()));
1272        }
1273        if let Some(source_language) = filter.source_language {
1274            sql.push_str(" AND source_language = ?");
1275            values.push(Box::new(source_language.to_string()));
1276        }
1277        if let Some(target_language) = filter.target_language {
1278            sql.push_str(" AND target_language = ?");
1279            values.push(Box::new(target_language.to_string()));
1280        }
1281        if filter.active_only {
1282            sql.push_str(" AND status IN ('user_seeded', 'accepted')");
1283        }
1284        sql.push_str(
1285            " ORDER BY source_language, target_language, scope_kind, scope_id, source_text",
1286        );
1287        let param_refs = values
1288            .iter()
1289            .map(|value| value.as_ref())
1290            .collect::<Vec<&dyn rusqlite::types::ToSql>>();
1291        let mut stmt = conn.prepare(&sql)?;
1292        let rows = stmt.query_map(param_refs.as_slice(), glossary_term_from_row)?;
1293        rows.collect::<std::result::Result<Vec<_>, _>>()
1294            .map_err(StoreError::from)
1295    }
1296
1297    pub fn load_active_glossary_terms(
1298        &self,
1299        source_language: &str,
1300        target_language: &str,
1301        book_id: Option<&str>,
1302        series_id: Option<&str>,
1303    ) -> Result<Vec<GlossaryTerm>> {
1304        let conn = self.conn.borrow();
1305        let mut stmt = conn.prepare(
1306            "SELECT id, scope_kind, scope_id, source_text, COALESCE(target_text, ''), category, notes,
1307                    case_sensitive, always_active, status, source_language, target_language,
1308                    source_count
1309             FROM glossary_terms
1310             WHERE source_language = ?1
1311               AND target_language = ?2
1312               AND status IN ('user_seeded', 'accepted')
1313               AND (
1314                    scope_kind = 'global'
1315                    OR (scope_kind = 'series' AND scope_id = ?3)
1316                    OR (scope_kind = 'book' AND scope_id = ?4)
1317               )
1318             ORDER BY scope_kind, scope_id, source_text",
1319        )?;
1320        let rows = stmt.query_map(
1321            params![source_language, target_language, series_id, book_id],
1322            glossary_term_from_row,
1323        )?;
1324        rows.collect::<std::result::Result<Vec<_>, _>>()
1325            .map_err(StoreError::from)
1326    }
1327
1328    pub fn load_active_glossary_terms_for_target(
1329        &self,
1330        target_language: &str,
1331        book_id: Option<&str>,
1332        series_id: Option<&str>,
1333    ) -> Result<Vec<GlossaryTerm>> {
1334        let conn = self.conn.borrow();
1335        let mut stmt = conn.prepare(
1336            "SELECT id, scope_kind, scope_id, source_text, COALESCE(target_text, ''), category, notes,
1337                    case_sensitive, always_active, status, source_language, target_language,
1338                    source_count
1339             FROM glossary_terms
1340             WHERE target_language = ?1
1341               AND status IN ('user_seeded', 'accepted')
1342               AND (
1343                    scope_kind = 'global'
1344                    OR (scope_kind = 'series' AND scope_id = ?2)
1345                    OR (scope_kind = 'book' AND scope_id = ?3)
1346               )
1347             ORDER BY scope_kind, scope_id, source_language, source_text",
1348        )?;
1349        let rows = stmt.query_map(
1350            params![target_language, series_id, book_id],
1351            glossary_term_from_row,
1352        )?;
1353        rows.collect::<std::result::Result<Vec<_>, _>>()
1354            .map_err(StoreError::from)
1355    }
1356
1357    pub fn remove_glossary_term(&self, id: i64) -> Result<usize> {
1358        let conn = self.conn.borrow();
1359        conn.execute("DELETE FROM glossary_terms WHERE id = ?1", params![id])
1360            .map_err(StoreError::from)
1361    }
1362
1363    pub fn clear_glossary_scope(
1364        &self,
1365        scope_kind: GlossaryScopeKind,
1366        scope_id: Option<&str>,
1367    ) -> Result<usize> {
1368        let conn = self.conn.borrow();
1369        let count = if scope_kind == GlossaryScopeKind::Global {
1370            conn.execute("DELETE FROM glossary_terms WHERE scope_kind = 'global'", [])?
1371        } else {
1372            conn.execute(
1373                "DELETE FROM glossary_terms WHERE scope_kind = ?1 AND scope_id = ?2",
1374                params![scope_kind.as_str(), scope_id],
1375            )?
1376        };
1377        Ok(count)
1378    }
1379
1380    /// Upsert a style sheet for a (scope, target_language) tuple. Returns
1381    /// the row id of the inserted/updated row.
1382    pub fn upsert_style_sheet(&self, record: &NewStyleSheet<'_>) -> Result<i64> {
1383        let conn = self.conn.borrow();
1384        let now = timestamp_string();
1385        conn.execute(
1386            "INSERT INTO style_sheets
1387                (scope_kind, scope_id, target_language, content_toml, fingerprint, created_at, updated_at)
1388             VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?6)
1389             ON CONFLICT(scope_kind, scope_id, target_language) DO UPDATE SET
1390                content_toml = excluded.content_toml,
1391                fingerprint = excluded.fingerprint,
1392                updated_at = excluded.updated_at",
1393            params![
1394                record.scope_kind.as_str(),
1395                record.scope_id,
1396                record.target_language,
1397                record.content_toml,
1398                record.fingerprint,
1399                now,
1400            ],
1401        )?;
1402        let id: i64 = conn.query_row(
1403            "SELECT id FROM style_sheets
1404                WHERE scope_kind = ?1 AND IFNULL(scope_id, '') = IFNULL(?2, '')
1405                  AND target_language = ?3",
1406            params![
1407                record.scope_kind.as_str(),
1408                record.scope_id,
1409                record.target_language
1410            ],
1411            |row| row.get(0),
1412        )?;
1413        Ok(id)
1414    }
1415
1416    /// Load all style sheets that apply for a given language pair and
1417    /// optional book/series scopes. Caller is responsible for merging via
1418    /// [`bookforge_core::style::merge_style_sheets`].
1419    pub fn load_active_style_sheets(
1420        &self,
1421        target_language: &str,
1422        book_id: Option<&str>,
1423        series_id: Option<&str>,
1424    ) -> Result<Vec<StoredStyleSheet>> {
1425        let conn = self.conn.borrow();
1426        let mut stmt = conn.prepare(
1427            "SELECT id, scope_kind, scope_id, target_language, content_toml, fingerprint
1428             FROM style_sheets
1429             WHERE target_language = ?1
1430               AND ( (scope_kind = 'global')
1431                  OR (scope_kind = 'series' AND scope_id = ?2)
1432                  OR (scope_kind = 'book' AND scope_id = ?3) )",
1433        )?;
1434        let rows = stmt.query_map(params![target_language, series_id, book_id], |row| {
1435            Ok(StoredStyleSheet {
1436                id: row.get(0)?,
1437                scope_kind: parse_row_enum(&row.get::<_, String>(1)?, 1)?,
1438                scope_id: row.get(2)?,
1439                target_language: row.get(3)?,
1440                content_toml: row.get(4)?,
1441                fingerprint: row.get(5)?,
1442            })
1443        })?;
1444        rows.collect::<std::result::Result<Vec<_>, _>>()
1445            .map_err(StoreError::from)
1446    }
1447
1448    pub fn list_style_sheets(
1449        &self,
1450        target_language: Option<&str>,
1451        scope_kind: Option<GlossaryScopeKind>,
1452        scope_id: Option<&str>,
1453    ) -> Result<Vec<StoredStyleSheet>> {
1454        let conn = self.conn.borrow();
1455        let mut stmt = conn.prepare(
1456            "SELECT id, scope_kind, scope_id, target_language, content_toml, fingerprint
1457             FROM style_sheets
1458             WHERE (?1 IS NULL OR target_language = ?1)
1459               AND (?2 IS NULL OR scope_kind = ?2)
1460               AND (?3 IS NULL OR scope_id = ?3)
1461             ORDER BY scope_kind, scope_id",
1462        )?;
1463        let scope_text = scope_kind.map(|s| s.as_str().to_string());
1464        let rows = stmt.query_map(params![target_language, scope_text, scope_id], |row| {
1465            Ok(StoredStyleSheet {
1466                id: row.get(0)?,
1467                scope_kind: parse_row_enum(&row.get::<_, String>(1)?, 1)?,
1468                scope_id: row.get(2)?,
1469                target_language: row.get(3)?,
1470                content_toml: row.get(4)?,
1471                fingerprint: row.get(5)?,
1472            })
1473        })?;
1474        rows.collect::<std::result::Result<Vec<_>, _>>()
1475            .map_err(StoreError::from)
1476    }
1477
1478    pub fn clear_style_scope(
1479        &self,
1480        scope_kind: GlossaryScopeKind,
1481        scope_id: Option<&str>,
1482    ) -> Result<usize> {
1483        let conn = self.conn.borrow();
1484        let count = if scope_kind == GlossaryScopeKind::Global {
1485            conn.execute("DELETE FROM style_sheets WHERE scope_kind = 'global'", [])?
1486        } else {
1487            conn.execute(
1488                "DELETE FROM style_sheets WHERE scope_kind = ?1 AND scope_id = ?2",
1489                params![scope_kind.as_str(), scope_id],
1490            )?
1491        };
1492        Ok(count)
1493    }
1494
1495    pub fn upsert_entities(&self, entities: &[NewEntity<'_>]) -> Result<usize> {
1496        if entities.is_empty() {
1497            return Ok(0);
1498        }
1499        let conn = self.conn.borrow();
1500        let now = timestamp_string();
1501        let mut changed = 0usize;
1502        for entity in entities {
1503            conn.execute(
1504                "INSERT INTO entities
1505                    (scope_kind, scope_id, source_name, target_name, gender_target,
1506                     role, notes, source_language, target_language, created_at, updated_at)
1507                 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, ?10)
1508                 ON CONFLICT(scope_kind, scope_id, source_name, source_language, target_language)
1509                 DO UPDATE SET
1510                    target_name = excluded.target_name,
1511                    gender_target = excluded.gender_target,
1512                    role = excluded.role,
1513                    notes = excluded.notes,
1514                    updated_at = excluded.updated_at",
1515                params![
1516                    entity.scope_kind.as_str(),
1517                    entity.scope_id,
1518                    entity.source_name,
1519                    entity.target_name,
1520                    entity.gender_target.map(|g| g.as_short()),
1521                    entity.role,
1522                    entity.notes,
1523                    entity.source_language,
1524                    entity.target_language,
1525                    now,
1526                ],
1527            )?;
1528            changed += 1;
1529        }
1530        Ok(changed)
1531    }
1532
1533    /// Load all entities that apply for a language pair and optional
1534    /// book/series scopes. Caller is responsible for merging via
1535    /// [`bookforge_core::entity::merge_scope_entities`].
1536    pub fn load_active_entities(
1537        &self,
1538        source_language: &str,
1539        target_language: &str,
1540        book_id: Option<&str>,
1541        series_id: Option<&str>,
1542    ) -> Result<Vec<StoredEntity>> {
1543        let conn = self.conn.borrow();
1544        let mut stmt = conn.prepare(
1545            "SELECT id, scope_kind, scope_id, source_name, target_name, gender_target,
1546                    role, notes, source_language, target_language
1547             FROM entities
1548             WHERE source_language = ?1
1549               AND target_language = ?2
1550               AND ( (scope_kind = 'global')
1551                  OR (scope_kind = 'series' AND scope_id = ?3)
1552                  OR (scope_kind = 'book' AND scope_id = ?4) )",
1553        )?;
1554        let rows = stmt.query_map(
1555            params![source_language, target_language, series_id, book_id],
1556            |row| {
1557                let gender: Option<String> = row.get(5)?;
1558                Ok(StoredEntity {
1559                    id: row.get(0)?,
1560                    scope_kind: parse_row_enum(&row.get::<_, String>(1)?, 1)?,
1561                    scope_id: row.get(2)?,
1562                    source_name: row.get(3)?,
1563                    target_name: row.get(4)?,
1564                    gender_target: gender.and_then(|g| parse_gender_short(&g)),
1565                    role: row.get(6)?,
1566                    notes: row.get(7)?,
1567                    source_language: row.get(8)?,
1568                    target_language: row.get(9)?,
1569                })
1570            },
1571        )?;
1572        rows.collect::<std::result::Result<Vec<_>, _>>()
1573            .map_err(StoreError::from)
1574    }
1575
1576    pub fn list_entities(
1577        &self,
1578        source_language: Option<&str>,
1579        target_language: Option<&str>,
1580        scope_kind: Option<GlossaryScopeKind>,
1581        scope_id: Option<&str>,
1582    ) -> Result<Vec<StoredEntity>> {
1583        let conn = self.conn.borrow();
1584        let mut stmt = conn.prepare(
1585            "SELECT id, scope_kind, scope_id, source_name, target_name, gender_target,
1586                    role, notes, source_language, target_language
1587             FROM entities
1588             WHERE (?1 IS NULL OR source_language = ?1)
1589               AND (?2 IS NULL OR target_language = ?2)
1590               AND (?3 IS NULL OR scope_kind = ?3)
1591               AND (?4 IS NULL OR scope_id = ?4)
1592             ORDER BY scope_kind, scope_id, source_name",
1593        )?;
1594        let scope_text = scope_kind.map(|s| s.as_str().to_string());
1595        let rows = stmt.query_map(
1596            params![source_language, target_language, scope_text, scope_id],
1597            |row| {
1598                let gender: Option<String> = row.get(5)?;
1599                Ok(StoredEntity {
1600                    id: row.get(0)?,
1601                    scope_kind: parse_row_enum(&row.get::<_, String>(1)?, 1)?,
1602                    scope_id: row.get(2)?,
1603                    source_name: row.get(3)?,
1604                    target_name: row.get(4)?,
1605                    gender_target: gender.and_then(|g| parse_gender_short(&g)),
1606                    role: row.get(6)?,
1607                    notes: row.get(7)?,
1608                    source_language: row.get(8)?,
1609                    target_language: row.get(9)?,
1610                })
1611            },
1612        )?;
1613        rows.collect::<std::result::Result<Vec<_>, _>>()
1614            .map_err(StoreError::from)
1615    }
1616
1617    pub fn clear_entities_scope(
1618        &self,
1619        scope_kind: GlossaryScopeKind,
1620        scope_id: Option<&str>,
1621    ) -> Result<usize> {
1622        let conn = self.conn.borrow();
1623        let count = if scope_kind == GlossaryScopeKind::Global {
1624            conn.execute("DELETE FROM entities WHERE scope_kind = 'global'", [])?
1625        } else {
1626            conn.execute(
1627                "DELETE FROM entities WHERE scope_kind = ?1 AND scope_id = ?2",
1628                params![scope_kind.as_str(), scope_id],
1629            )?
1630        };
1631        Ok(count)
1632    }
1633
1634    pub fn pending_segment_ids(&self, job_id: &str) -> Result<Vec<String>> {
1635        let conn = self.conn.borrow();
1636        let mut stmt = conn.prepare(
1637            "SELECT id FROM segments
1638             WHERE job_id = ?1 AND status IN ('queued', 'retry_pending')
1639             ORDER BY ordinal",
1640        )?;
1641        let rows = stmt.query_map(params![job_id], |row| row.get::<_, String>(0))?;
1642        rows.collect::<std::result::Result<Vec<_>, _>>()
1643            .map_err(StoreError::from)
1644    }
1645
1646    pub fn segment_records(&self, job_id: &str) -> Result<Vec<SegmentRecord>> {
1647        let conn = self.conn.borrow();
1648        let mut stmt = conn.prepare(
1649            "SELECT id,
1650                    status,
1651                    attempts,
1652                    error,
1653                    COALESCE(tokens_input, input_tokens),
1654                    tokens_input_cached,
1655                    COALESCE(tokens_output, output_tokens),
1656                    tokens_estimated
1657             FROM segments WHERE job_id = ?1 ORDER BY ordinal",
1658        )?;
1659        let rows = stmt.query_map(params![job_id], |row| {
1660            Ok(SegmentRecord {
1661                id: row.get(0)?,
1662                status: row.get(1)?,
1663                attempts: row.get::<_, i64>(2)? as usize,
1664                error: row.get(3)?,
1665                input_tokens: row.get::<_, Option<i64>>(4)?.map(|value| value as u64),
1666                input_cached_tokens: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
1667                output_tokens: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
1668                tokens_estimated: row.get::<_, i64>(7)? != 0,
1669            })
1670        })?;
1671        rows.collect::<std::result::Result<Vec<_>, _>>()
1672            .map_err(StoreError::from)
1673    }
1674
1675    pub fn load_block_translations(&self, job_id: &str) -> Result<Vec<StoredBlockTranslation>> {
1676        let conn = self.conn.borrow();
1677        let mut stmt = conn.prepare(
1678            "SELECT segment_id, block_id, translated_text
1679             FROM translation_blocks WHERE job_id = ?1 ORDER BY segment_id, block_id",
1680        )?;
1681        let rows = stmt.query_map(params![job_id], |row| {
1682            Ok(StoredBlockTranslation {
1683                segment_id: row.get(0)?,
1684                block_id: row.get(1)?,
1685                text: row.get(2)?,
1686            })
1687        })?;
1688        rows.collect::<std::result::Result<Vec<_>, _>>()
1689            .map_err(StoreError::from)
1690    }
1691
1692    pub fn load_terminal_segment_translations(
1693        &self,
1694        job_id: &str,
1695    ) -> Result<Vec<StoredSegmentTranslation>> {
1696        let conn = self.conn.borrow();
1697        let mut stmt = conn.prepare(
1698            "SELECT s.id, s.ordinal, s.status, s.error, t.translated_text
1699             FROM segments s
1700             JOIN translations t ON t.job_id = s.job_id AND t.segment_id = s.id
1701             WHERE s.job_id = ?1 AND s.status IN ('succeeded', 'skipped_cached', 'needs_review')
1702             ORDER BY s.ordinal",
1703        )?;
1704        let rows = stmt.query_map(params![job_id], |row| {
1705            Ok((
1706                row.get::<_, String>(0)?,
1707                row.get::<_, i64>(1)?,
1708                row.get::<_, String>(2)?,
1709                row.get::<_, Option<String>>(3)?,
1710                row.get::<_, String>(4)?,
1711            ))
1712        })?;
1713
1714        let mut records = Vec::new();
1715        for row in rows {
1716            let (segment_id, ordinal, status, error, translated_text) = row?;
1717            let mut block_stmt = conn.prepare(
1718                "SELECT block_id, translated_text
1719                 FROM translation_blocks
1720                 WHERE job_id = ?1 AND segment_id = ?2
1721                 ORDER BY block_id",
1722            )?;
1723            let blocks = block_stmt
1724                .query_map(params![job_id, segment_id.as_str()], |row| {
1725                    Ok(BlockTranslation {
1726                        block_id: BlockId(row.get::<_, String>(0)?),
1727                        text: row.get(1)?,
1728                    })
1729                })?
1730                .collect::<std::result::Result<Vec<_>, _>>()?;
1731            records.push(StoredSegmentTranslation {
1732                segment_id,
1733                ordinal: ordinal as usize,
1734                status,
1735                error,
1736                translated_text,
1737                blocks,
1738            });
1739        }
1740
1741        Ok(records)
1742    }
1743
1744    pub fn resumable_segment_ids(&self, job_id: &str) -> Result<Vec<String>> {
1745        let conn = self.conn.borrow();
1746        let mut stmt = conn.prepare(
1747            "SELECT id FROM segments
1748             WHERE job_id = ?1 AND status IN ('queued', 'retry_pending', 'failed')
1749             ORDER BY ordinal",
1750        )?;
1751        let rows = stmt.query_map(params![job_id], |row| row.get::<_, String>(0))?;
1752        rows.collect::<std::result::Result<Vec<_>, _>>()
1753            .map_err(StoreError::from)
1754    }
1755
1756    #[allow(clippy::too_many_arguments)]
1757    pub fn find_cached_translation(
1758        &self,
1759        segment: &Segment,
1760        prompt_version: &str,
1761        provider: &str,
1762        model: &str,
1763        source_lang: Option<&str>,
1764        target_lang: &str,
1765        cache_namespace: &str,
1766    ) -> Result<Option<CachedTranslation>> {
1767        let conn = self.conn.borrow();
1768        let cached = conn
1769            .query_row(
1770                "SELECT t.job_id, t.segment_id, t.translated_text
1771                 FROM translations t
1772                 JOIN segments s ON s.job_id = t.job_id AND s.id = t.segment_id
1773                 JOIN jobs j ON j.id = t.job_id
1774                 WHERE s.source_hash = ?1
1775                   AND s.prompt_version = ?2
1776                   AND s.provider = ?3
1777                   AND s.model = ?4
1778                   AND ((?5 IS NULL AND j.source_lang IS NULL) OR j.source_lang = ?5)
1779                   AND j.target_lang = ?6
1780                   AND s.cache_namespace = ?7
1781                   AND s.status IN ('succeeded', 'skipped_cached')
1782                 ORDER BY CASE s.status WHEN 'succeeded' THEN 0 ELSE 1 END,
1783                          CAST(t.created_at AS INTEGER) DESC,
1784                          t.rowid DESC
1785                 LIMIT 1",
1786                params![
1787                    segment.checksum,
1788                    prompt_version,
1789                    provider,
1790                    model,
1791                    source_lang,
1792                    target_lang,
1793                    cache_namespace,
1794                ],
1795                |row| {
1796                    Ok((
1797                        row.get::<_, String>(0)?,
1798                        row.get::<_, String>(1)?,
1799                        row.get::<_, String>(2)?,
1800                    ))
1801                },
1802            )
1803            .optional()?;
1804
1805        let Some((job_id, segment_id, translated_text)) = cached else {
1806            return Ok(None);
1807        };
1808
1809        let mut stmt = conn.prepare(
1810            "SELECT block_id, translated_text
1811             FROM translation_blocks
1812             WHERE job_id = ?1 AND segment_id = ?2
1813             ORDER BY block_id",
1814        )?;
1815        let rows = stmt.query_map(params![job_id, segment_id], |row| {
1816            Ok(BlockTranslation {
1817                block_id: BlockId(row.get::<_, String>(0)?),
1818                text: row.get(1)?,
1819            })
1820        })?;
1821        let blocks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1822
1823        let mut by_id = blocks
1824            .into_iter()
1825            .map(|block| (block.block_id.0.clone(), block))
1826            .collect::<HashMap<_, _>>();
1827
1828        let mut ordered = Vec::with_capacity(segment.block_ids.len());
1829        for id in &segment.block_ids {
1830            let Some(block) = by_id.remove(&id.0) else {
1831                return Ok(None);
1832            };
1833            ordered.push(block);
1834        }
1835        if !by_id.is_empty() {
1836            return Ok(None);
1837        }
1838
1839        Ok(Some(CachedTranslation {
1840            translated_text,
1841            blocks: ordered,
1842        }))
1843    }
1844
1845    pub fn find_cached_translations_batch(
1846        &self,
1847        segments: &[Segment],
1848        request: CacheLookupRequest<'_>,
1849    ) -> Result<HashMap<String, CachedTranslation>> {
1850        let mut results = HashMap::new();
1851        if segments.is_empty() {
1852            return Ok(results);
1853        }
1854
1855        const SQLITE_IN_CHUNK_SIZE: usize = 900;
1856
1857        for chunk in segments.chunks(SQLITE_IN_CHUNK_SIZE) {
1858            let hashes: Vec<&str> = chunk.iter().map(|s| s.checksum.as_str()).collect();
1859            let placeholders: Vec<String> =
1860                (0..hashes.len()).map(|i| format!("?{}", i + 1)).collect();
1861            let placeholders_sql = placeholders.join(", ");
1862
1863            let sql = format!(
1864                "SELECT t.job_id, t.segment_id, t.translated_text, s.source_hash
1865                 FROM translations t
1866                 JOIN segments s ON s.job_id = t.job_id AND s.id = t.segment_id
1867                 JOIN jobs j ON j.id = t.job_id
1868                 WHERE s.source_hash IN ({placeholders_sql})
1869                   AND s.prompt_version = ?{}
1870                   AND s.provider = ?{}
1871                   AND s.model = ?{}
1872                   AND ((?{} IS NULL AND j.source_lang IS NULL) OR j.source_lang = ?{})
1873                   AND j.target_lang = ?{}
1874                   AND s.cache_namespace = ?{}
1875                   AND s.status IN ('succeeded', 'skipped_cached')
1876                 ORDER BY CASE s.status WHEN 'succeeded' THEN 0 ELSE 1 END,
1877                          CAST(t.created_at AS INTEGER) DESC,
1878                          t.rowid DESC",
1879                hashes.len() + 1,
1880                hashes.len() + 2,
1881                hashes.len() + 3,
1882                hashes.len() + 4,
1883                hashes.len() + 5,
1884                hashes.len() + 6,
1885                hashes.len() + 7,
1886            );
1887
1888            let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1889            for hash in &hashes {
1890                params.push(Box::new(hash.to_string()));
1891            }
1892            params.push(Box::new(request.prompt_version.to_string()));
1893            params.push(Box::new(request.provider.to_string()));
1894            params.push(Box::new(request.model.to_string()));
1895            params.push(Box::new(request.source_lang.map(|s| s.to_string())));
1896            params.push(Box::new(request.source_lang.map(|s| s.to_string())));
1897            params.push(Box::new(request.target_lang.to_string()));
1898            params.push(Box::new(request.cache_namespace.to_string()));
1899
1900            let conn = self.conn.borrow();
1901            let mut stmt = conn.prepare(&sql)?;
1902            let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1903                params.iter().map(|p| p.as_ref()).collect();
1904
1905            let rows = stmt.query_map(param_refs.as_slice(), |row| {
1906                Ok((
1907                    row.get::<_, String>(0)?,
1908                    row.get::<_, String>(1)?,
1909                    row.get::<_, String>(2)?,
1910                    row.get::<_, String>(3)?,
1911                ))
1912            })?;
1913
1914            let mut hash_to_hit: HashMap<String, (String, String, String)> = HashMap::new();
1915            for row in rows {
1916                let (job_id, segment_id, translated_text, source_hash) = row?;
1917                hash_to_hit
1918                    .entry(source_hash)
1919                    .or_insert((job_id, segment_id, translated_text));
1920            }
1921
1922            for segment in chunk {
1923                if let Some((job_id, segment_id, translated_text)) =
1924                    hash_to_hit.get(&segment.checksum)
1925                {
1926                    let mut block_stmt = conn.prepare(
1927                        "SELECT block_id, translated_text
1928                         FROM translation_blocks
1929                         WHERE job_id = ?1 AND segment_id = ?2
1930                         ORDER BY block_id",
1931                    )?;
1932                    let block_rows = block_stmt.query_map(params![job_id, segment_id], |row| {
1933                        Ok(BlockTranslation {
1934                            block_id: BlockId(row.get::<_, String>(0)?),
1935                            text: row.get(1)?,
1936                        })
1937                    })?;
1938                    let blocks = block_rows.collect::<std::result::Result<Vec<_>, _>>()?;
1939
1940                    let mut by_id = blocks
1941                        .into_iter()
1942                        .map(|block| (block.block_id.0.clone(), block))
1943                        .collect::<HashMap<_, _>>();
1944
1945                    let mut ordered = Vec::with_capacity(segment.block_ids.len());
1946                    let mut valid = true;
1947                    for id in &segment.block_ids {
1948                        let Some(block) = by_id.remove(&id.0) else {
1949                            valid = false;
1950                            break;
1951                        };
1952                        ordered.push(block);
1953                    }
1954                    if !valid || !by_id.is_empty() {
1955                        continue;
1956                    }
1957
1958                    results.insert(
1959                        segment.id.0.clone(),
1960                        CachedTranslation {
1961                            translated_text: translated_text.clone(),
1962                            blocks: ordered,
1963                        },
1964                    );
1965                }
1966            }
1967        }
1968
1969        Ok(results)
1970    }
1971
1972    fn migrate(&self) -> Result<()> {
1973        let conn = self.conn.borrow();
1974        if table_exists(&conn, "translations")?
1975            && !table_has_column(&conn, "translations", "job_id")?
1976        {
1977            let suffix = unix_timestamp_nanos();
1978            rename_table_if_exists(&conn, "qa_findings", suffix)?;
1979            rename_table_if_exists(&conn, "translation_blocks", suffix)?;
1980            rename_table_if_exists(&conn, "translations", suffix)?;
1981            rename_table_if_exists(&conn, "segments", suffix)?;
1982            rename_table_if_exists(&conn, "jobs", suffix)?;
1983        }
1984        conn.execute_batch(
1985            "
1986            PRAGMA foreign_keys = ON;
1987            CREATE TABLE IF NOT EXISTS _migrations (
1988              version INTEGER PRIMARY KEY,
1989              name TEXT NOT NULL,
1990              applied_at TEXT NOT NULL
1991            );
1992
1993            CREATE TABLE IF NOT EXISTS jobs (
1994              id TEXT PRIMARY KEY,
1995              input_path TEXT NOT NULL DEFAULT '',
1996              input_snapshot_path TEXT,
1997              input_sha256 TEXT,
1998              output_path TEXT NOT NULL DEFAULT '',
1999              input_hash TEXT NOT NULL,
2000              source_lang TEXT,
2001              target_lang TEXT NOT NULL,
2002              provider TEXT NOT NULL,
2003              model TEXT NOT NULL,
2004              base_url TEXT,
2005              api_key_env TEXT,
2006              status TEXT NOT NULL,
2007              config_json TEXT,
2008              events_path TEXT,
2009              report_json_path TEXT,
2010              report_markdown_path TEXT,
2011              book_id TEXT,
2012              series_id TEXT,
2013              created_at TEXT NOT NULL,
2014              updated_at TEXT NOT NULL
2015            );
2016
2017            CREATE TABLE IF NOT EXISTS segments (
2018              id TEXT NOT NULL,
2019              job_id TEXT NOT NULL,
2020              section_id TEXT NOT NULL,
2021              ordinal INTEGER NOT NULL,
2022              source_hash TEXT NOT NULL,
2023              prompt_version TEXT NOT NULL,
2024              provider TEXT NOT NULL,
2025              model TEXT NOT NULL,
2026              status TEXT NOT NULL,
2027              attempts INTEGER NOT NULL DEFAULT 0,
2028              input_tokens INTEGER,
2029              output_tokens INTEGER,
2030              tokens_input INTEGER,
2031              tokens_input_cached INTEGER,
2032              tokens_output INTEGER,
2033              tokens_estimated INTEGER NOT NULL DEFAULT 0,
2034              cost_estimate REAL,
2035              error TEXT,
2036              translated_hash TEXT,
2037              PRIMARY KEY (job_id, id),
2038              FOREIGN KEY(job_id) REFERENCES jobs(id)
2039            );
2040
2041            CREATE TABLE IF NOT EXISTS translations (
2042              segment_id TEXT NOT NULL,
2043              job_id TEXT NOT NULL,
2044              translated_text TEXT NOT NULL,
2045              provider TEXT NOT NULL,
2046              model TEXT NOT NULL,
2047              prompt_version TEXT NOT NULL,
2048              created_at TEXT NOT NULL,
2049              PRIMARY KEY (job_id, segment_id),
2050              FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
2051            );
2052
2053            CREATE TABLE IF NOT EXISTS translation_blocks (
2054              segment_id TEXT NOT NULL,
2055              job_id TEXT NOT NULL,
2056              block_id TEXT NOT NULL,
2057              translated_text TEXT NOT NULL,
2058              PRIMARY KEY (job_id, segment_id, block_id),
2059              FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
2060            );
2061
2062            CREATE TABLE IF NOT EXISTS qa_findings (
2063              id TEXT PRIMARY KEY,
2064              segment_id TEXT NOT NULL,
2065              job_id TEXT NOT NULL,
2066              severity TEXT NOT NULL,
2067              kind TEXT NOT NULL,
2068              message TEXT NOT NULL,
2069              FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
2070            );
2071
2072            CREATE TABLE IF NOT EXISTS segment_flags (
2073              id INTEGER PRIMARY KEY,
2074              job_id TEXT NOT NULL,
2075              segment_id TEXT NOT NULL,
2076              kind TEXT NOT NULL,
2077              note TEXT,
2078              suggested_source TEXT,
2079              suggested_target TEXT,
2080              ingested_at TEXT NOT NULL,
2081              consumed INTEGER NOT NULL DEFAULT 0,
2082              FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
2083            );
2084
2085            CREATE TABLE IF NOT EXISTS glossary_terms (
2086              id INTEGER PRIMARY KEY,
2087              scope_kind TEXT NOT NULL CHECK(scope_kind IN ('global', 'series', 'book')),
2088              scope_id TEXT,
2089              source_text TEXT NOT NULL,
2090              target_text TEXT,
2091              category TEXT NOT NULL CHECK(category IN
2092                ('person', 'place', 'object', 'invented', 'style', 'phrase', 'other')),
2093              notes TEXT,
2094              case_sensitive INTEGER NOT NULL DEFAULT 0,
2095              always_active INTEGER NOT NULL DEFAULT 0,
2096              status TEXT NOT NULL CHECK(status IN
2097                ('user_seeded', 'auto_candidate', 'accepted', 'rejected'))
2098                DEFAULT 'user_seeded',
2099              source_language TEXT NOT NULL,
2100              target_language TEXT NOT NULL,
2101              source_count INTEGER DEFAULT 0,
2102              created_at TEXT NOT NULL,
2103              updated_at TEXT NOT NULL,
2104              UNIQUE(scope_kind, scope_id, source_text, source_language, target_language)
2105            );
2106
2107            CREATE TABLE IF NOT EXISTS style_sheets (
2108              id INTEGER PRIMARY KEY,
2109              scope_kind TEXT NOT NULL CHECK(scope_kind IN ('global', 'series', 'book')),
2110              scope_id TEXT,
2111              target_language TEXT NOT NULL,
2112              content_toml TEXT NOT NULL,
2113              fingerprint TEXT NOT NULL,
2114              created_at TEXT NOT NULL,
2115              updated_at TEXT NOT NULL,
2116              UNIQUE(scope_kind, scope_id, target_language)
2117            );
2118
2119            CREATE TABLE IF NOT EXISTS entities (
2120              id INTEGER PRIMARY KEY,
2121              scope_kind TEXT NOT NULL CHECK(scope_kind IN ('global', 'series', 'book')),
2122              scope_id TEXT,
2123              source_name TEXT NOT NULL,
2124              target_name TEXT NOT NULL,
2125              gender_target TEXT
2126                CHECK(gender_target IS NULL OR gender_target IN ('m', 'f', 'n')),
2127              role TEXT,
2128              notes TEXT,
2129              source_language TEXT NOT NULL,
2130              target_language TEXT NOT NULL,
2131              created_at TEXT NOT NULL,
2132              updated_at TEXT NOT NULL,
2133              UNIQUE(scope_kind, scope_id, source_name, source_language, target_language)
2134            );
2135            ",
2136        )?;
2137        ensure_glossary_target_nullable(&conn)?;
2138        ensure_column(&conn, "jobs", "input_path", "TEXT NOT NULL DEFAULT ''")?;
2139        ensure_column(&conn, "jobs", "input_snapshot_path", "TEXT")?;
2140        ensure_column(&conn, "jobs", "input_sha256", "TEXT")?;
2141        ensure_column(&conn, "jobs", "output_path", "TEXT NOT NULL DEFAULT ''")?;
2142        ensure_column(&conn, "jobs", "base_url", "TEXT")?;
2143        ensure_column(&conn, "jobs", "api_key_env", "TEXT")?;
2144        ensure_column(&conn, "jobs", "config_json", "TEXT")?;
2145        ensure_column(&conn, "jobs", "events_path", "TEXT")?;
2146        ensure_column(&conn, "jobs", "report_json_path", "TEXT")?;
2147        ensure_column(&conn, "jobs", "report_markdown_path", "TEXT")?;
2148        ensure_column(&conn, "jobs", "book_id", "TEXT")?;
2149        ensure_column(&conn, "jobs", "series_id", "TEXT")?;
2150        ensure_column(
2151            &conn,
2152            "segments",
2153            "cache_namespace",
2154            "TEXT NOT NULL DEFAULT ''",
2155        )?;
2156        ensure_column(&conn, "segments", "tokens_input", "INTEGER")?;
2157        ensure_column(&conn, "segments", "tokens_input_cached", "INTEGER")?;
2158        ensure_column(&conn, "segments", "tokens_output", "INTEGER")?;
2159        ensure_column(
2160            &conn,
2161            "segments",
2162            "tokens_estimated",
2163            "INTEGER NOT NULL DEFAULT 0",
2164        )?;
2165        conn.execute_batch(
2166            "CREATE INDEX IF NOT EXISTS idx_segments_cache_lookup
2167             ON segments(source_hash, cache_namespace, prompt_version, provider, model, status);
2168             CREATE INDEX IF NOT EXISTS idx_segment_flags_job
2169             ON segment_flags(job_id, consumed);
2170             CREATE INDEX IF NOT EXISTS idx_glossary_lookup
2171             ON glossary_terms(source_language, target_language, scope_kind, scope_id, status);
2172             CREATE INDEX IF NOT EXISTS idx_style_lookup
2173             ON style_sheets(target_language, scope_kind, scope_id);
2174             CREATE INDEX IF NOT EXISTS idx_entity_lookup
2175             ON entities(source_language, target_language, scope_kind, scope_id);",
2176        )?;
2177        record_migration(&conn, 1, "initial")?;
2178        record_migration(&conn, 2, "v1_0_1_input_snapshot")?;
2179        record_migration(&conn, 3, "v1_1_segment_flags")?;
2180        record_migration(&conn, 4, "v1_2_glossary_terms")?;
2181        record_migration(&conn, 5, "v1_2_1_nullable_glossary_candidate_targets")?;
2182        record_migration(&conn, 6, "v1_3_context_styles_entities")?;
2183        Ok(())
2184    }
2185
2186    fn touch_job(&self, job_id: &str, status: &str) -> Result<()> {
2187        let conn = self.conn.borrow();
2188        conn.execute(
2189            "UPDATE jobs SET status = ?1, updated_at = ?2 WHERE id = ?3",
2190            params![status, timestamp_string(), job_id],
2191        )?;
2192        Ok(())
2193    }
2194}
2195
2196fn table_exists(conn: &Connection, table: &str) -> rusqlite::Result<bool> {
2197    conn.query_row(
2198        "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
2199        params![table],
2200        |row| row.get::<_, bool>(0),
2201    )
2202}
2203
2204fn table_has_column(conn: &Connection, table: &str, column: &str) -> rusqlite::Result<bool> {
2205    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2206    let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
2207    for row in rows {
2208        if row? == column {
2209            return Ok(true);
2210        }
2211    }
2212    Ok(false)
2213}
2214
2215fn rename_table_if_exists(conn: &Connection, table: &str, suffix: u128) -> rusqlite::Result<()> {
2216    if table_exists(conn, table)? {
2217        conn.execute(
2218            &format!("ALTER TABLE {table} RENAME TO {table}_legacy_{suffix}"),
2219            [],
2220        )?;
2221    }
2222    Ok(())
2223}
2224
2225fn ensure_column(
2226    conn: &Connection,
2227    table: &str,
2228    column: &str,
2229    definition: &str,
2230) -> rusqlite::Result<()> {
2231    if !table_has_column(conn, table, column)? {
2232        conn.execute(
2233            &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
2234            [],
2235        )?;
2236    }
2237    Ok(())
2238}
2239
2240fn ensure_glossary_target_nullable(conn: &Connection) -> rusqlite::Result<()> {
2241    if !table_exists(conn, "glossary_terms")?
2242        || !table_column_is_not_null(conn, "glossary_terms", "target_text")?
2243    {
2244        return Ok(());
2245    }
2246
2247    let legacy_table = format!("glossary_terms_v1_2_0_{}", unix_timestamp_nanos());
2248    conn.execute_batch(&format!(
2249        "
2250        DROP INDEX IF EXISTS idx_glossary_lookup;
2251        ALTER TABLE glossary_terms RENAME TO {legacy_table};
2252        CREATE TABLE glossary_terms (
2253          id INTEGER PRIMARY KEY,
2254          scope_kind TEXT NOT NULL CHECK(scope_kind IN ('global', 'series', 'book')),
2255          scope_id TEXT,
2256          source_text TEXT NOT NULL,
2257          target_text TEXT,
2258          category TEXT NOT NULL CHECK(category IN
2259            ('person', 'place', 'object', 'invented', 'style', 'phrase', 'other')),
2260          notes TEXT,
2261          case_sensitive INTEGER NOT NULL DEFAULT 0,
2262          always_active INTEGER NOT NULL DEFAULT 0,
2263          status TEXT NOT NULL CHECK(status IN
2264            ('user_seeded', 'auto_candidate', 'accepted', 'rejected'))
2265            DEFAULT 'user_seeded',
2266          source_language TEXT NOT NULL,
2267          target_language TEXT NOT NULL,
2268          source_count INTEGER DEFAULT 0,
2269          created_at TEXT NOT NULL,
2270          updated_at TEXT NOT NULL,
2271          UNIQUE(scope_kind, scope_id, source_text, source_language, target_language)
2272        );
2273        INSERT INTO glossary_terms
2274          (id, scope_kind, scope_id, source_text, target_text, category, notes,
2275           case_sensitive, always_active, status, source_language, target_language,
2276           source_count, created_at, updated_at)
2277        SELECT id, scope_kind, scope_id, source_text, target_text, category, notes,
2278               case_sensitive, always_active, status, source_language, target_language,
2279               source_count, created_at, updated_at
2280        FROM {legacy_table};
2281        DROP TABLE {legacy_table};
2282        ",
2283    ))?;
2284    Ok(())
2285}
2286
2287fn table_column_is_not_null(
2288    conn: &Connection,
2289    table: &str,
2290    column: &str,
2291) -> rusqlite::Result<bool> {
2292    let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
2293    let mut rows = stmt.query([])?;
2294    while let Some(row) = rows.next()? {
2295        let name: String = row.get(1)?;
2296        if name == column {
2297            let not_null: i64 = row.get(3)?;
2298            return Ok(not_null != 0);
2299        }
2300    }
2301    Ok(false)
2302}
2303
2304fn record_migration(conn: &Connection, version: i64, name: &str) -> rusqlite::Result<()> {
2305    conn.execute(
2306        "INSERT OR IGNORE INTO _migrations (version, name, applied_at)
2307         VALUES (?1, ?2, ?3)",
2308        params![version, name, timestamp_string()],
2309    )?;
2310    Ok(())
2311}
2312
2313fn glossary_term_from_row(row: &rusqlite::Row<'_>) -> rusqlite::Result<GlossaryTerm> {
2314    let scope_kind_text: String = row.get(1)?;
2315    let category_text: String = row.get(5)?;
2316    let status_text: String = row.get(9)?;
2317    Ok(GlossaryTerm {
2318        id: Some(row.get(0)?),
2319        scope_kind: parse_row_enum(&scope_kind_text, 1)?,
2320        scope_id: row.get(2)?,
2321        source_text: row.get(3)?,
2322        target_text: row.get(4)?,
2323        category: parse_row_enum(&category_text, 5)?,
2324        notes: row.get(6)?,
2325        case_sensitive: row.get::<_, i64>(7)? != 0,
2326        always_active: row.get::<_, i64>(8)? != 0,
2327        status: parse_row_enum(&status_text, 9)?,
2328        source_language: row.get(10)?,
2329        target_language: row.get(11)?,
2330        source_count: row.get::<_, Option<i64>>(12)?.unwrap_or(0).max(0) as usize,
2331    })
2332}
2333
2334fn glossary_candidate_from_row(
2335    row: &rusqlite::Row<'_>,
2336) -> rusqlite::Result<StoredGlossaryCandidate> {
2337    let category_text: String = row.get(3)?;
2338    let status_text: String = row.get(7)?;
2339    Ok(StoredGlossaryCandidate {
2340        id: row.get(0)?,
2341        source_text: row.get(1)?,
2342        target_text: row.get(2)?,
2343        category: parse_row_enum(&category_text, 3)?,
2344        notes: row.get(4)?,
2345        case_sensitive: row.get::<_, i64>(5)? != 0,
2346        always_active: row.get::<_, i64>(6)? != 0,
2347        status: parse_row_enum(&status_text, 7)?,
2348        source_language: row.get(8)?,
2349        target_language: row.get(9)?,
2350        source_count: row.get::<_, Option<i64>>(10)?.unwrap_or(0).max(0) as usize,
2351    })
2352}
2353
2354fn parse_gender_short(value: &str) -> Option<EntityGender> {
2355    match value {
2356        "m" => Some(EntityGender::Masculine),
2357        "f" => Some(EntityGender::Feminine),
2358        "n" => Some(EntityGender::Neuter),
2359        _ => None,
2360    }
2361}
2362
2363fn parse_row_enum<T>(value: &str, column: usize) -> rusqlite::Result<T>
2364where
2365    T: FromStr<Err = String>,
2366{
2367    value.parse::<T>().map_err(|err| {
2368        rusqlite::Error::FromSqlConversionFailure(column, Type::Text, Box::new(RowEnumError(err)))
2369    })
2370}
2371
2372#[derive(Debug)]
2373struct RowEnumError(String);
2374
2375impl std::fmt::Display for RowEnumError {
2376    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
2377        f.write_str(&self.0)
2378    }
2379}
2380
2381impl std::error::Error for RowEnumError {}
2382
2383fn replace_block_translations(
2384    conn: &Connection,
2385    job_id: &str,
2386    segment_id: &str,
2387    blocks: &[BlockTranslation],
2388) -> rusqlite::Result<()> {
2389    conn.execute(
2390        "DELETE FROM translation_blocks WHERE job_id = ?1 AND segment_id = ?2",
2391        params![job_id, segment_id],
2392    )?;
2393    for block in blocks {
2394        conn.execute(
2395            "INSERT INTO translation_blocks (segment_id, job_id, block_id, translated_text)
2396             VALUES (?1, ?2, ?3, ?4)",
2397            params![
2398                segment_id,
2399                job_id,
2400                block.block_id.0.as_str(),
2401                block.text.as_str()
2402            ],
2403        )?;
2404    }
2405    Ok(())
2406}
2407
2408#[derive(Debug, Clone, Copy)]
2409pub enum RetryScope {
2410    Failed,
2411    NeedsReview,
2412    All,
2413}
2414
2415fn file_hash(path: &Path) -> CoreResult<String> {
2416    let bytes = fs::read(path)?;
2417    Ok(stable_hash_bytes(&bytes))
2418}
2419
2420fn stable_hash(text: &str) -> String {
2421    stable_hash_bytes(text.as_bytes())
2422}
2423
2424fn stable_hash_bytes(bytes: &[u8]) -> String {
2425    let digest = Sha256::digest(bytes);
2426    let mut output = String::with_capacity(digest.len() * 2);
2427    for byte in digest {
2428        use std::fmt::Write as _;
2429        write!(&mut output, "{byte:02x}").expect("writing to string should not fail");
2430    }
2431    output
2432}
2433
2434fn unix_timestamp() -> u64 {
2435    SystemTime::now()
2436        .duration_since(UNIX_EPOCH)
2437        .unwrap_or_default()
2438        .as_secs()
2439}
2440
2441fn unix_timestamp_nanos() -> u128 {
2442    SystemTime::now()
2443        .duration_since(UNIX_EPOCH)
2444        .unwrap_or_default()
2445        .as_nanos()
2446}
2447
2448fn timestamp_string() -> String {
2449    unix_timestamp().to_string()
2450}
2451
2452#[cfg(test)]
2453mod tests {
2454    use super::*;
2455    use bookforge_core::{
2456        ir::{BlockId, SectionId},
2457        segment::{
2458            Segment, SegmentBlock, SegmentConstraints, SegmentContext, SegmentId, SegmentMetadata,
2459            SegmentSource, SegmentTextRun,
2460        },
2461    };
2462
2463    #[test]
2464    fn store_reuses_connection_across_job_operations() {
2465        let db_path = temp_path("jobs.sqlite");
2466        let input_path = temp_path("input.epub");
2467        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2468
2469        let store = JobStore::open(&db_path).expect("store should open");
2470        let job = store
2471            .create_job(CreateJob {
2472                input: &input_path,
2473                output: &temp_path("output.epub"),
2474                source_lang: Some("English"),
2475                target_lang: "Italian",
2476                provider: "mock",
2477                model: "mock-prefix",
2478                base_url: None,
2479                api_key_env: None,
2480                book_id: None,
2481                series_id: None,
2482            })
2483            .expect("job should be created");
2484        let segments = vec![segment("seg_a", 0), segment("seg_b", 1)];
2485        store
2486            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2487            .expect("segments should insert");
2488
2489        store
2490            .save_translation(SaveTranslation {
2491                job_id: &job.id,
2492                segment_id: "seg_a",
2493                translated_text: "Tradotto",
2494                blocks: &[BlockTranslation {
2495                    block_id: BlockId("b_000000".to_string()),
2496                    text: "Tradotto".to_string(),
2497                }],
2498                provider: "mock",
2499                model: "mock-prefix",
2500                prompt_version: "v1",
2501                input_tokens: Some(11),
2502                input_cached_tokens: Some(0),
2503                output_tokens: Some(7),
2504                tokens_estimated: false,
2505            })
2506            .expect("translation should save");
2507        store
2508            .mark_segment_failed(&job.id, "seg_b", "provider unavailable")
2509            .expect("segment should be marked failed");
2510
2511        let summary = store
2512            .summary(&job.id)
2513            .expect("summary should load")
2514            .expect("job should exist");
2515        assert_eq!(summary.total_segments, 2);
2516        assert_eq!(summary.succeeded, 1);
2517        assert_eq!(summary.failed, 1);
2518        assert_eq!(summary.input_tokens, 11);
2519        assert_eq!(summary.output_tokens, 7);
2520        let blocks = store
2521            .load_block_translations(&job.id)
2522            .expect("block translations should load");
2523        assert_eq!(blocks.len(), 1);
2524        assert_eq!(blocks[0].text, "Tradotto");
2525
2526        let _ = fs::remove_file(db_path);
2527        let _ = fs::remove_file(input_path);
2528    }
2529
2530    fn segment(id: &str, ordinal: usize) -> Segment {
2531        let block_id = BlockId(format!("b_{ordinal:06}"));
2532        Segment {
2533            id: SegmentId(id.to_string()),
2534            section_id: SectionId("sec_000000".to_string()),
2535            ordinal,
2536            block_ids: vec![block_id.clone()],
2537            source: SegmentSource {
2538                text: format!("Source {ordinal}"),
2539                blocks: vec![SegmentBlock {
2540                    block_id,
2541                    kind: "paragraph".to_string(),
2542                    text: format!("Source {ordinal}"),
2543                    text_runs: vec![SegmentTextRun {
2544                        id: format!("r{ordinal}"),
2545                        text: format!("Source {ordinal}"),
2546                    }],
2547                    protected_spans: Vec::new(),
2548                }],
2549                token_estimate: 2,
2550            },
2551            context: SegmentContext::default(),
2552            metadata: SegmentMetadata::default(),
2553            constraints: SegmentConstraints::default(),
2554            checksum: format!("checksum_{ordinal}"),
2555        }
2556    }
2557
2558    fn temp_path(name: &str) -> PathBuf {
2559        std::env::temp_dir().join(format!(
2560            "bookforge-store-test-{}-{}-{name}",
2561            std::process::id(),
2562            unix_timestamp_nanos()
2563        ))
2564    }
2565
2566    fn build_seeded_store_with_translation(
2567        db_path: &PathBuf,
2568        cache_namespace: &str,
2569        block_ids: &[&str],
2570    ) -> (JobStore, JobRecord, Segment) {
2571        let input_path = temp_path("input.epub");
2572        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2573
2574        let store = JobStore::open(db_path).expect("store should open");
2575        let job = store
2576            .create_job(CreateJob {
2577                input: &input_path,
2578                output: &temp_path("output.epub"),
2579                source_lang: Some("English"),
2580                target_lang: "Italian",
2581                provider: "mock",
2582                model: "mock-prefix",
2583                base_url: None,
2584                api_key_env: None,
2585                book_id: None,
2586                series_id: None,
2587            })
2588            .expect("job should be created");
2589
2590        let mut seg = segment("seg_a", 0);
2591        let blocks: Vec<BlockTranslation> = block_ids
2592            .iter()
2593            .map(|id| BlockTranslation {
2594                block_id: BlockId(id.to_string()),
2595                text: format!("Tradotto {id}"),
2596            })
2597            .collect();
2598        seg.block_ids = block_ids.iter().map(|id| BlockId(id.to_string())).collect();
2599
2600        store
2601            .insert_segments(
2602                &job.id,
2603                std::slice::from_ref(&seg),
2604                "v1",
2605                "mock",
2606                "mock-prefix",
2607                cache_namespace,
2608            )
2609            .expect("segments should insert");
2610        store
2611            .save_translation(SaveTranslation {
2612                job_id: &job.id,
2613                segment_id: "seg_a",
2614                translated_text: "Tradotto",
2615                blocks: &blocks,
2616                provider: "mock",
2617                model: "mock-prefix",
2618                prompt_version: "v1",
2619                input_tokens: Some(11),
2620                input_cached_tokens: Some(0),
2621                output_tokens: Some(7),
2622                tokens_estimated: false,
2623            })
2624            .expect("translation should save");
2625
2626        let _ = fs::remove_file(input_path);
2627        (store, job, seg)
2628    }
2629
2630    #[test]
2631    fn job_config_snapshot_round_trips_through_store() {
2632        let db_path = temp_path("snapshot.sqlite");
2633        let input_path = temp_path("input.epub");
2634        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2635        let store = JobStore::open(&db_path).expect("store should open");
2636        let job = store
2637            .create_job(CreateJob {
2638                input: &input_path,
2639                output: &temp_path("output.epub"),
2640                source_lang: Some("English"),
2641                target_lang: "Italian",
2642                provider: "openrouter",
2643                model: "model",
2644                base_url: Some("https://example.test/v1"),
2645                api_key_env: Some("OPENROUTER_API_KEY"),
2646                book_id: None,
2647                series_id: None,
2648            })
2649            .expect("job should be created");
2650        let settings = bookforge_core::TranslationProfile::Balanced.resolve();
2651        let snapshot = RunConfigSnapshot {
2652            input_path: input_path.clone(),
2653            input_snapshot_path: Some(temp_path("input-snapshot.epub")),
2654            input_sha256: Some("abc123".to_string()),
2655            output_path: temp_path("translated.epub"),
2656            events_path: Some(temp_path("events.jsonl")),
2657            report_json_path: Some(temp_path("report.json")),
2658            report_markdown_path: Some(temp_path("report.md")),
2659            source_language: Some("English".to_string()),
2660            target_language: "Italian".to_string(),
2661            provider: "openrouter".to_string(),
2662            model: "model".to_string(),
2663            base_url: Some("https://example.test/v1".to_string()),
2664            api_key_env: Some("OPENROUTER_API_KEY".to_string()),
2665            profile: settings.profile,
2666            provider_preset: None,
2667            prompt_version: "batch_v1".to_string(),
2668            cache_namespace: "cache_ns".to_string(),
2669            book_id: None,
2670            series_id: None,
2671            glossary_budget_tokens: 800,
2672            glossary_format: bookforge_core::GlossaryFormat::Json,
2673            prompt_extra: None,
2674            glossary_fingerprint: String::new(),
2675            glossary_terms: Vec::new(),
2676            context_window: 0,
2677            context_budget_tokens: 1200,
2678            context_scope: bookforge_core::config::ContextScope::Chapter,
2679            style_fingerprint: String::new(),
2680            style_rendered_block: String::new(),
2681            entities_fingerprint: String::new(),
2682            entities_rendered_block: String::new(),
2683            settings: bookforge_core::ResolvedRunSettingsSnapshot::from_settings(&settings),
2684        };
2685
2686        store
2687            .update_job_config_snapshot(&job.id, &snapshot)
2688            .expect("snapshot should persist");
2689        let loaded = store
2690            .load_job_config_snapshot(&job.id)
2691            .expect("snapshot should load")
2692            .expect("snapshot should exist");
2693        assert_eq!(loaded, snapshot);
2694
2695        let reloaded_job = store
2696            .get_job(&job.id)
2697            .expect("job should load")
2698            .expect("job should exist");
2699        assert_eq!(reloaded_job.events_path, snapshot.events_path);
2700        assert_eq!(reloaded_job.report_json_path, snapshot.report_json_path);
2701        assert_eq!(
2702            reloaded_job.report_markdown_path,
2703            snapshot.report_markdown_path
2704        );
2705
2706        let _ = fs::remove_file(db_path);
2707        let _ = fs::remove_file(input_path);
2708    }
2709
2710    #[test]
2711    fn job_config_snapshot_does_not_store_api_key_value() {
2712        let db_path = temp_path("snapshot_secret.sqlite");
2713        let input_path = temp_path("input.epub");
2714        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2715        let api_key_env = "BOOKFORGE_TEST_API_KEY_VALUE_NOT_STORED";
2716        let api_key_value = "sk-live-secret-that-must-not-be-persisted";
2717        // This test uses a unique process-local env var and verifies snapshot
2718        // serialization never reads the value.
2719        unsafe {
2720            std::env::set_var(api_key_env, api_key_value);
2721        }
2722
2723        let store = JobStore::open(&db_path).expect("store should open");
2724        let job = store
2725            .create_job(CreateJob {
2726                input: &input_path,
2727                output: &temp_path("output.epub"),
2728                source_lang: Some("English"),
2729                target_lang: "Italian",
2730                provider: "openrouter",
2731                model: "model",
2732                base_url: Some("https://example.test/v1"),
2733                api_key_env: Some(api_key_env),
2734                book_id: None,
2735                series_id: None,
2736            })
2737            .expect("job should be created");
2738        let settings = bookforge_core::TranslationProfile::Balanced.resolve();
2739        let snapshot = RunConfigSnapshot {
2740            input_path: input_path.clone(),
2741            input_snapshot_path: None,
2742            input_sha256: None,
2743            output_path: temp_path("translated.epub"),
2744            events_path: Some(temp_path("events.jsonl")),
2745            report_json_path: Some(temp_path("report.json")),
2746            report_markdown_path: Some(temp_path("report.md")),
2747            source_language: Some("English".to_string()),
2748            target_language: "Italian".to_string(),
2749            provider: "openrouter".to_string(),
2750            model: "model".to_string(),
2751            base_url: Some("https://example.test/v1".to_string()),
2752            api_key_env: Some(api_key_env.to_string()),
2753            profile: settings.profile,
2754            provider_preset: None,
2755            prompt_version: "batch_v1".to_string(),
2756            cache_namespace: "cache_ns".to_string(),
2757            book_id: None,
2758            series_id: None,
2759            glossary_budget_tokens: 800,
2760            glossary_format: bookforge_core::GlossaryFormat::Json,
2761            prompt_extra: None,
2762            glossary_fingerprint: String::new(),
2763            glossary_terms: Vec::new(),
2764            context_window: 0,
2765            context_budget_tokens: 1200,
2766            context_scope: bookforge_core::config::ContextScope::Chapter,
2767            style_fingerprint: String::new(),
2768            style_rendered_block: String::new(),
2769            entities_fingerprint: String::new(),
2770            entities_rendered_block: String::new(),
2771            settings: bookforge_core::ResolvedRunSettingsSnapshot::from_settings(&settings),
2772        };
2773
2774        store
2775            .update_job_config_snapshot(&job.id, &snapshot)
2776            .expect("snapshot should persist");
2777        let raw_json = {
2778            let conn = store.conn.borrow();
2779            conn.query_row(
2780                "SELECT config_json FROM jobs WHERE id = ?1",
2781                params![job.id],
2782                |row| row.get::<_, String>(0),
2783            )
2784            .expect("raw snapshot JSON should load")
2785        };
2786
2787        assert!(raw_json.contains(api_key_env));
2788        assert!(!raw_json.contains(api_key_value));
2789
2790        unsafe {
2791            std::env::remove_var(api_key_env);
2792        }
2793        let _ = fs::remove_file(db_path);
2794        let _ = fs::remove_file(input_path);
2795    }
2796
2797    #[test]
2798    fn terminal_loading_and_resumable_ids_preserve_lifecycle_boundaries() {
2799        let db_path = temp_path("terminal_resume.sqlite");
2800        let input_path = temp_path("input.epub");
2801        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2802        let store = JobStore::open(&db_path).expect("store should open");
2803        let job = store
2804            .create_job(CreateJob {
2805                input: &input_path,
2806                output: &temp_path("output.epub"),
2807                source_lang: Some("English"),
2808                target_lang: "Italian",
2809                provider: "mock",
2810                model: "mock-prefix",
2811                base_url: None,
2812                api_key_env: None,
2813                book_id: None,
2814                series_id: None,
2815            })
2816            .expect("job should be created");
2817        let segments = vec![
2818            segment("seg_done", 0),
2819            segment("seg_cached", 1),
2820            segment("seg_review", 2),
2821            segment("seg_failed", 3),
2822            segment("seg_queued", 4),
2823        ];
2824        store
2825            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "ns")
2826            .expect("segments should insert");
2827        store
2828            .save_translation(SaveTranslation {
2829                job_id: &job.id,
2830                segment_id: "seg_done",
2831                translated_text: "Done",
2832                blocks: &[BlockTranslation {
2833                    block_id: BlockId("b_000000".to_string()),
2834                    text: "Done".to_string(),
2835                }],
2836                provider: "mock",
2837                model: "mock-prefix",
2838                prompt_version: "v1",
2839                input_tokens: None,
2840                input_cached_tokens: None,
2841                output_tokens: None,
2842                tokens_estimated: false,
2843            })
2844            .expect("done should save");
2845        store
2846            .save_cached_translation(SaveCachedTranslation {
2847                job_id: &job.id,
2848                segment_id: "seg_cached",
2849                translated_text: "Cached",
2850                blocks: &[BlockTranslation {
2851                    block_id: BlockId("b_000001".to_string()),
2852                    text: "Cached".to_string(),
2853                }],
2854                provider: "mock",
2855                model: "mock-prefix",
2856                prompt_version: "v1",
2857            })
2858            .expect("cached should save");
2859        store
2860            .save_needs_review(SaveNeedsReview {
2861                job_id: &job.id,
2862                segment_id: "seg_review",
2863                preserved_text: "Review",
2864                blocks: &[BlockTranslation {
2865                    block_id: BlockId("b_000002".to_string()),
2866                    text: "Review".to_string(),
2867                }],
2868                provider: "mock",
2869                model: "mock-prefix",
2870                prompt_version: "v1",
2871                error: "needs eyes",
2872                input_tokens: None,
2873                input_cached_tokens: None,
2874                output_tokens: None,
2875                tokens_estimated: false,
2876            })
2877            .expect("review should save");
2878        store
2879            .mark_segment_failed(&job.id, "seg_failed", "failed")
2880            .expect("failed should mark");
2881
2882        let terminal = store
2883            .load_terminal_segment_translations(&job.id)
2884            .expect("terminal records should load");
2885        let ids = terminal
2886            .iter()
2887            .map(|record| record.segment_id.as_str())
2888            .collect::<Vec<_>>();
2889        assert_eq!(ids, vec!["seg_done", "seg_cached", "seg_review"]);
2890        assert_eq!(terminal[0].blocks[0].block_id.0, "b_000000");
2891        assert_eq!(terminal[2].status, "needs_review");
2892
2893        let resumable = store
2894            .resumable_segment_ids(&job.id)
2895            .expect("resumable ids should load");
2896        assert_eq!(resumable, vec!["seg_failed", "seg_queued"]);
2897
2898        let _ = fs::remove_file(db_path);
2899        let _ = fs::remove_file(input_path);
2900    }
2901
2902    #[test]
2903    fn mark_unfinished_segments_failed_preserves_terminal_segments() {
2904        let db_path = temp_path("unfinished_preserve.sqlite");
2905        let input_path = temp_path("input.epub");
2906        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2907        let store = JobStore::open(&db_path).expect("store should open");
2908        let job = store
2909            .create_job(CreateJob {
2910                input: &input_path,
2911                output: &temp_path("output.epub"),
2912                source_lang: Some("English"),
2913                target_lang: "Italian",
2914                provider: "mock",
2915                model: "mock-prefix",
2916                base_url: None,
2917                api_key_env: None,
2918                book_id: None,
2919                series_id: None,
2920            })
2921            .expect("job should be created");
2922        let segments = vec![
2923            segment("seg_succeeded", 0),
2924            segment("seg_cached", 1),
2925            segment("seg_review", 2),
2926            segment("seg_queued", 3),
2927            segment("seg_retry", 4),
2928        ];
2929        store
2930            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2931            .expect("segments should insert");
2932        store
2933            .save_translation(SaveTranslation {
2934                job_id: &job.id,
2935                segment_id: "seg_succeeded",
2936                translated_text: "Done",
2937                blocks: &[BlockTranslation {
2938                    block_id: BlockId("b_000000".to_string()),
2939                    text: "Done".to_string(),
2940                }],
2941                provider: "mock",
2942                model: "mock-prefix",
2943                prompt_version: "v1",
2944                input_tokens: None,
2945                input_cached_tokens: None,
2946                output_tokens: None,
2947                tokens_estimated: false,
2948            })
2949            .expect("succeeded segment should save");
2950        store
2951            .save_cached_translation(SaveCachedTranslation {
2952                job_id: &job.id,
2953                segment_id: "seg_cached",
2954                translated_text: "Cached",
2955                blocks: &[BlockTranslation {
2956                    block_id: BlockId("b_000001".to_string()),
2957                    text: "Cached".to_string(),
2958                }],
2959                provider: "mock",
2960                model: "mock-prefix",
2961                prompt_version: "v1",
2962            })
2963            .expect("cached segment should save");
2964        store
2965            .save_needs_review(SaveNeedsReview {
2966                job_id: &job.id,
2967                segment_id: "seg_review",
2968                preserved_text: "Needs review",
2969                blocks: &[BlockTranslation {
2970                    block_id: BlockId("b_000002".to_string()),
2971                    text: "Needs review".to_string(),
2972                }],
2973                provider: "mock",
2974                model: "mock-prefix",
2975                prompt_version: "v1",
2976                error: "qa issue",
2977                input_tokens: None,
2978                input_cached_tokens: None,
2979                output_tokens: None,
2980                tokens_estimated: false,
2981            })
2982            .expect("needs-review segment should save");
2983        store
2984            .retry_segments(&job.id, RetryScope::Failed)
2985            .expect("retry with no failed segments should be harmless");
2986        {
2987            let conn = store.conn.borrow();
2988            conn.execute(
2989                "UPDATE segments SET status = 'retry_pending' WHERE job_id = ?1 AND id = 'seg_retry'",
2990                params![job.id],
2991            )
2992            .expect("test status update should work");
2993        }
2994
2995        let candidate_ids = segments
2996            .iter()
2997            .map(|segment| segment.id.0.clone())
2998            .collect::<Vec<_>>();
2999        let changed = store
3000            .mark_unfinished_segments_failed(&job.id, &candidate_ids, "run failed")
3001            .expect("unfinished segments should be marked failed");
3002        assert_eq!(changed, 2);
3003
3004        let records = store.segment_records(&job.id).expect("records should load");
3005        let statuses = records
3006            .into_iter()
3007            .map(|record| (record.id, record.status))
3008            .collect::<HashMap<_, _>>();
3009        assert_eq!(statuses["seg_succeeded"], "succeeded");
3010        assert_eq!(statuses["seg_cached"], "skipped_cached");
3011        assert_eq!(statuses["seg_review"], "needs_review");
3012        assert_eq!(statuses["seg_queued"], "failed");
3013        assert_eq!(statuses["seg_retry"], "failed");
3014
3015        let _ = fs::remove_file(db_path);
3016        let _ = fs::remove_file(input_path);
3017    }
3018
3019    #[test]
3020    fn mark_unfinished_segments_failed_marks_only_resumable_segments() {
3021        let db_path = temp_path("unfinished_resumable_only.sqlite");
3022        let input_path = temp_path("input.epub");
3023        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
3024        let store = JobStore::open(&db_path).expect("store should open");
3025        let job = store
3026            .create_job(CreateJob {
3027                input: &input_path,
3028                output: &temp_path("output.epub"),
3029                source_lang: Some("English"),
3030                target_lang: "Italian",
3031                provider: "mock",
3032                model: "mock-prefix",
3033                base_url: None,
3034                api_key_env: None,
3035                book_id: None,
3036                series_id: None,
3037            })
3038            .expect("job should be created");
3039        let segments = vec![
3040            segment("seg_succeeded", 0),
3041            segment("seg_cached", 1),
3042            segment("seg_review", 2),
3043            segment("seg_failed", 3),
3044            segment("seg_retry", 4),
3045            segment("seg_queued", 5),
3046        ];
3047        store
3048            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
3049            .expect("segments should insert");
3050        {
3051            let conn = store.conn.borrow();
3052            for (id, status) in [
3053                ("seg_succeeded", "succeeded"),
3054                ("seg_cached", "skipped_cached"),
3055                ("seg_review", "needs_review"),
3056                ("seg_failed", "failed"),
3057                ("seg_retry", "retry_pending"),
3058                ("seg_queued", "queued"),
3059            ] {
3060                conn.execute(
3061                    "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
3062                    params![status, job.id, id],
3063                )
3064                .expect("status should update");
3065            }
3066        }
3067
3068        let candidate_ids = segments
3069            .iter()
3070            .map(|segment| segment.id.0.clone())
3071            .collect::<Vec<_>>();
3072        let changed = store
3073            .mark_unfinished_segments_failed(&job.id, &candidate_ids, "run failed")
3074            .expect("unfinished segments should be marked failed");
3075
3076        assert_eq!(changed, 3);
3077        let records = store.segment_records(&job.id).expect("records should load");
3078        let statuses = records
3079            .into_iter()
3080            .map(|record| (record.id, record.status))
3081            .collect::<HashMap<_, _>>();
3082        assert_eq!(statuses["seg_succeeded"], "succeeded");
3083        assert_eq!(statuses["seg_cached"], "skipped_cached");
3084        assert_eq!(statuses["seg_review"], "needs_review");
3085        assert_eq!(statuses["seg_failed"], "failed");
3086        assert_eq!(statuses["seg_retry"], "failed");
3087        assert_eq!(statuses["seg_queued"], "failed");
3088
3089        let _ = fs::remove_file(db_path);
3090        let _ = fs::remove_file(input_path);
3091    }
3092
3093    #[test]
3094    fn resumable_segment_ids_excludes_succeeded_cached_and_needs_review() {
3095        let db_path = temp_path("resumable_excludes.sqlite");
3096        let input_path = temp_path("input.epub");
3097        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
3098        let store = JobStore::open(&db_path).expect("store should open");
3099        let job = store
3100            .create_job(CreateJob {
3101                input: &input_path,
3102                output: &temp_path("output.epub"),
3103                source_lang: Some("English"),
3104                target_lang: "Italian",
3105                provider: "mock",
3106                model: "mock-prefix",
3107                base_url: None,
3108                api_key_env: None,
3109                book_id: None,
3110                series_id: None,
3111            })
3112            .expect("job should be created");
3113        let segments = vec![
3114            segment("seg_succeeded", 0),
3115            segment("seg_cached", 1),
3116            segment("seg_review", 2),
3117        ];
3118        store
3119            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
3120            .expect("segments should insert");
3121        {
3122            let conn = store.conn.borrow();
3123            for (id, status) in [
3124                ("seg_succeeded", "succeeded"),
3125                ("seg_cached", "skipped_cached"),
3126                ("seg_review", "needs_review"),
3127            ] {
3128                conn.execute(
3129                    "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
3130                    params![status, job.id, id],
3131                )
3132                .expect("status should update");
3133            }
3134        }
3135
3136        let ids = store
3137            .resumable_segment_ids(&job.id)
3138            .expect("resumable ids should load");
3139
3140        assert!(ids.is_empty());
3141        let _ = fs::remove_file(db_path);
3142        let _ = fs::remove_file(input_path);
3143    }
3144
3145    #[test]
3146    fn resumable_segment_ids_includes_failed_retry_pending_and_pending() {
3147        let db_path = temp_path("resumable_includes.sqlite");
3148        let input_path = temp_path("input.epub");
3149        fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
3150        let store = JobStore::open(&db_path).expect("store should open");
3151        let job = store
3152            .create_job(CreateJob {
3153                input: &input_path,
3154                output: &temp_path("output.epub"),
3155                source_lang: Some("English"),
3156                target_lang: "Italian",
3157                provider: "mock",
3158                model: "mock-prefix",
3159                base_url: None,
3160                api_key_env: None,
3161                book_id: None,
3162                series_id: None,
3163            })
3164            .expect("job should be created");
3165        let segments = vec![
3166            segment("seg_failed", 0),
3167            segment("seg_retry", 1),
3168            segment("seg_queued", 2),
3169        ];
3170        store
3171            .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
3172            .expect("segments should insert");
3173        {
3174            let conn = store.conn.borrow();
3175            for (id, status) in [
3176                ("seg_failed", "failed"),
3177                ("seg_retry", "retry_pending"),
3178                ("seg_queued", "queued"),
3179            ] {
3180                conn.execute(
3181                    "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
3182                    params![status, job.id, id],
3183                )
3184                .expect("status should update");
3185            }
3186        }
3187
3188        let ids = store
3189            .resumable_segment_ids(&job.id)
3190            .expect("resumable ids should load");
3191
3192        assert_eq!(ids, vec!["seg_failed", "seg_retry", "seg_queued"]);
3193        let _ = fs::remove_file(db_path);
3194        let _ = fs::remove_file(input_path);
3195    }
3196
3197    #[test]
3198    fn cached_translation_requires_matching_cache_namespace() {
3199        let db_path = temp_path("ns_match.sqlite");
3200        let (store, _job, seg) =
3201            build_seeded_store_with_translation(&db_path, "ns_one", &["b_000000"]);
3202
3203        let hit = store
3204            .find_cached_translation(
3205                &seg,
3206                "v1",
3207                "mock",
3208                "mock-prefix",
3209                Some("English"),
3210                "Italian",
3211                "ns_one",
3212            )
3213            .expect("query ok");
3214        assert!(hit.is_some(), "matching namespace should hit");
3215
3216        let miss = store
3217            .find_cached_translation(
3218                &seg,
3219                "v1",
3220                "mock",
3221                "mock-prefix",
3222                Some("English"),
3223                "Italian",
3224                "ns_two",
3225            )
3226            .expect("query ok");
3227        assert!(miss.is_none(), "different namespace must not hit");
3228
3229        let _ = fs::remove_file(db_path);
3230    }
3231
3232    #[test]
3233    fn cached_translation_rejects_mismatched_block_ids() {
3234        let db_path = temp_path("blockid_match.sqlite");
3235        let (store, _job, mut seg) =
3236            build_seeded_store_with_translation(&db_path, "ns_x", &["b_000000"]);
3237
3238        // Caller's segment now expects different block IDs than what was stored.
3239        seg.block_ids = vec![BlockId("b_999999".to_string())];
3240
3241        let miss = store
3242            .find_cached_translation(
3243                &seg,
3244                "v1",
3245                "mock",
3246                "mock-prefix",
3247                Some("English"),
3248                "Italian",
3249                "ns_x",
3250            )
3251            .expect("query ok");
3252        assert!(
3253            miss.is_none(),
3254            "mismatched block_ids must reject the cached row"
3255        );
3256
3257        let _ = fs::remove_file(db_path);
3258    }
3259
3260    #[test]
3261    fn cached_translation_prefers_repaired_succeeded_rows_over_cached_clones() {
3262        let db_path = temp_path("cache_prefers_repaired.sqlite");
3263        let (store, _stale_job, seg) =
3264            build_seeded_store_with_translation(&db_path, "cache_ns", &["b_000000"]);
3265        let cached_input = temp_path("cached-input.epub");
3266        let repaired_input = temp_path("repaired-input.epub");
3267        fs::write(&cached_input, b"cached input").expect("cached input should be writable");
3268        fs::write(&repaired_input, b"repaired input").expect("repaired input should be writable");
3269
3270        let cached_job = store
3271            .create_job(CreateJob {
3272                input: &cached_input,
3273                output: &temp_path("cached-output.epub"),
3274                source_lang: Some("English"),
3275                target_lang: "Italian",
3276                provider: "mock",
3277                model: "mock-prefix",
3278                base_url: None,
3279                api_key_env: None,
3280                book_id: None,
3281                series_id: None,
3282            })
3283            .expect("cached job should be created");
3284        store
3285            .insert_segments(
3286                &cached_job.id,
3287                std::slice::from_ref(&seg),
3288                "v1",
3289                "mock",
3290                "mock-prefix",
3291                "cache_ns",
3292            )
3293            .expect("cached job segment should insert");
3294        store
3295            .save_cached_translation(SaveCachedTranslation {
3296                job_id: &cached_job.id,
3297                segment_id: "seg_a",
3298                translated_text: "stale cached clone",
3299                blocks: &[BlockTranslation {
3300                    block_id: BlockId("b_000000".to_string()),
3301                    text: "stale cached clone".to_string(),
3302                }],
3303                provider: "mock",
3304                model: "mock-prefix",
3305                prompt_version: "v1",
3306            })
3307            .expect("cached clone should save");
3308
3309        let repaired_job = store
3310            .create_job(CreateJob {
3311                input: &repaired_input,
3312                output: &temp_path("repaired-output.epub"),
3313                source_lang: Some("English"),
3314                target_lang: "Italian",
3315                provider: "mock",
3316                model: "mock-prefix",
3317                base_url: None,
3318                api_key_env: None,
3319                book_id: None,
3320                series_id: None,
3321            })
3322            .expect("repaired job should be created");
3323        store
3324            .insert_segments(
3325                &repaired_job.id,
3326                std::slice::from_ref(&seg),
3327                "v1",
3328                "mock",
3329                "mock-prefix",
3330                "cache_ns",
3331            )
3332            .expect("repaired job segment should insert");
3333        store
3334            .save_translation(SaveTranslation {
3335                job_id: &repaired_job.id,
3336                segment_id: "seg_a",
3337                translated_text: "repaired translation",
3338                blocks: &[BlockTranslation {
3339                    block_id: BlockId("b_000000".to_string()),
3340                    text: "repaired translation".to_string(),
3341                }],
3342                provider: "mock",
3343                model: "mock-prefix",
3344                prompt_version: "v1",
3345                input_tokens: Some(12),
3346                input_cached_tokens: Some(0),
3347                output_tokens: Some(8),
3348                tokens_estimated: false,
3349            })
3350            .expect("repaired translation should save");
3351
3352        let request = CacheLookupRequest {
3353            prompt_version: "v1",
3354            provider: "mock",
3355            model: "mock-prefix",
3356            source_lang: Some("English"),
3357            target_lang: "Italian",
3358            cache_namespace: "cache_ns",
3359        };
3360        let single_hit = store
3361            .find_cached_translation(
3362                &seg,
3363                request.prompt_version,
3364                request.provider,
3365                request.model,
3366                request.source_lang,
3367                request.target_lang,
3368                request.cache_namespace,
3369            )
3370            .expect("single lookup should succeed")
3371            .expect("single lookup should hit");
3372        let batch_hit = store
3373            .find_cached_translations_batch(std::slice::from_ref(&seg), request)
3374            .expect("batch lookup should succeed")
3375            .remove(&seg.id.0)
3376            .expect("batch lookup should hit");
3377
3378        assert_eq!(single_hit.translated_text, "repaired translation");
3379        assert_eq!(single_hit.blocks[0].text, "repaired translation");
3380        assert_eq!(batch_hit.translated_text, "repaired translation");
3381        assert_eq!(batch_hit.blocks[0].text, "repaired translation");
3382
3383        let _ = fs::remove_file(db_path);
3384        let _ = fs::remove_file(cached_input);
3385        let _ = fs::remove_file(repaired_input);
3386    }
3387
3388    #[test]
3389    fn old_empty_cache_namespace_rows_do_not_match_new_runs() {
3390        let db_path = temp_path("legacy_ns.sqlite");
3391        // Simulate a row migrated from an older schema with the default empty namespace.
3392        let (store, _job, seg) = build_seeded_store_with_translation(&db_path, "", &["b_000000"]);
3393
3394        let miss = store
3395            .find_cached_translation(
3396                &seg,
3397                "v1",
3398                "mock",
3399                "mock-prefix",
3400                Some("English"),
3401                "Italian",
3402                "real_ns",
3403            )
3404            .expect("query ok");
3405        assert!(
3406            miss.is_none(),
3407            "legacy empty-namespace row must not satisfy a real namespace lookup"
3408        );
3409
3410        let _ = fs::remove_file(db_path);
3411    }
3412
3413    #[test]
3414    fn job_store_enables_wal_and_busy_timeout() {
3415        let db_path = temp_path("wal_busy.sqlite");
3416        let store = JobStore::open(&db_path).expect("store should open");
3417
3418        let conn = store.conn.borrow();
3419        let journal_mode: String = conn
3420            .pragma_query_value(None, "journal_mode", |row| row.get(0))
3421            .expect("pragma journal_mode should succeed");
3422        assert_eq!(
3423            journal_mode.to_lowercase(),
3424            "wal",
3425            "WAL journal mode must be enabled"
3426        );
3427
3428        let busy_timeout: i64 = conn
3429            .pragma_query_value(None, "busy_timeout", |row| row.get(0))
3430            .expect("pragma busy_timeout should succeed");
3431        assert!(
3432            busy_timeout >= 5000,
3433            "busy_timeout should be at least 5000ms, got {busy_timeout}"
3434        );
3435
3436        let wal_path = db_path.with_extension("sqlite-wal");
3437        let shm_path = db_path.with_extension("sqlite-shm");
3438        // WAL/shm may or may not exist depending on transactions, but the
3439        // journal_mode query confirms WAL is active.
3440
3441        let _ = fs::remove_file(db_path);
3442        let _ = fs::remove_file(wal_path);
3443        let _ = fs::remove_file(shm_path);
3444    }
3445
3446    #[test]
3447    fn job_store_enables_foreign_keys_on_every_connection() {
3448        let db_path = temp_path("fk.sqlite");
3449        let store = JobStore::open(&db_path).expect("store should open");
3450
3451        let conn = store.conn.borrow();
3452        let fk_enabled: i64 = conn
3453            .pragma_query_value(None, "foreign_keys", |row| row.get(0))
3454            .expect("pragma foreign_keys should succeed");
3455        assert_eq!(
3456            fk_enabled, 1,
3457            "foreign_keys pragma must be ON on every connection"
3458        );
3459
3460        let _ = fs::remove_file(db_path);
3461    }
3462
3463    #[test]
3464    fn doctor_reports_wal_sidecars_as_normal_when_integrity_check_passes() {
3465        let db_path = temp_path("doctor.sqlite");
3466        let store = JobStore::open(&db_path).expect("store should open");
3467
3468        // Perform a write to trigger WAL sidecar creation.
3469        let input_path = temp_path("input_doctor.epub");
3470        fs::write(&input_path, b"epub bytes").expect("test epub");
3471        let _job = store
3472            .create_job(CreateJob {
3473                input: &input_path,
3474                output: &temp_path("out_doctor.epub"),
3475                source_lang: Some("English"),
3476                target_lang: "Italian",
3477                provider: "mock",
3478                model: "mock-prefix",
3479                base_url: None,
3480                api_key_env: None,
3481                book_id: None,
3482                series_id: None,
3483            })
3484            .expect("job created");
3485        drop(store);
3486
3487        let doctor = run_doctor(Some(db_path.clone())).expect("doctor should run");
3488        assert!(doctor.database_exists, "database should exist");
3489        assert_eq!(
3490            doctor.journal_mode.to_lowercase(),
3491            "wal",
3492            "journal mode should be wal"
3493        );
3494        assert_eq!(doctor.integrity_check, "ok", "integrity check should pass");
3495        assert!(
3496            doctor.wal_sidecars_normal,
3497            "wal sidecars should be reported as normal"
3498        );
3499
3500        if doctor.wal_present || doctor.shm_present {
3501            assert!(
3502                !doctor.note.is_empty(),
3503                "doctor must explain WAL sidecars when they are present"
3504            );
3505        }
3506
3507        let wal_path = db_path.with_extension("sqlite-wal");
3508        let shm_path = db_path.with_extension("sqlite-shm");
3509        let _ = fs::remove_file(&db_path);
3510        let _ = fs::remove_file(input_path);
3511        let _ = fs::remove_file(wal_path);
3512        let _ = fs::remove_file(shm_path);
3513    }
3514
3515    #[test]
3516    fn checkpoint_writer_and_reader_do_not_immediately_busy_fail() {
3517        let db_path = temp_path("concurrent.sqlite");
3518        let input_path = temp_path("input_conc.epub");
3519        fs::write(&input_path, b"epub bytes").expect("test epub");
3520
3521        // Open writer store first and create a job.
3522        let store_w = JobStore::open(&db_path).expect("store_w open");
3523        let job = store_w
3524            .create_job(CreateJob {
3525                input: &input_path,
3526                output: &temp_path("out_conc.epub"),
3527                source_lang: Some("English"),
3528                target_lang: "Italian",
3529                provider: "mock",
3530                model: "mock-prefix",
3531                base_url: None,
3532                api_key_env: None,
3533                book_id: None,
3534                series_id: None,
3535            })
3536            .expect("job created");
3537        store_w
3538            .insert_segments(
3539                &job.id,
3540                &[segment("seg_conc", 0)],
3541                "v1",
3542                "mock",
3543                "mock-prefix",
3544                "ns",
3545            )
3546            .expect("segments inserted");
3547
3548        // Open a second reader store while the first is still active.
3549        let store_r = JobStore::open(&db_path).expect("store_r open");
3550        let summary = store_r
3551            .summary(&job.id)
3552            .expect("summary should load")
3553            .expect("job should exist");
3554        assert_eq!(summary.total_segments, 1);
3555
3556        let wal_path = db_path.with_extension("sqlite-wal");
3557        let shm_path = db_path.with_extension("sqlite-shm");
3558        let _ = fs::remove_file(&db_path);
3559        let _ = fs::remove_file(input_path);
3560        let _ = fs::remove_file(wal_path);
3561        let _ = fs::remove_file(shm_path);
3562    }
3563
3564    #[test]
3565    fn migrate_creates_glossary_terms_table() {
3566        let db_path = temp_path("glossary_migrate.sqlite");
3567        let store = JobStore::open(&db_path).expect("store opens");
3568        let conn = store.conn.borrow();
3569        let table: String = conn
3570            .query_row(
3571                "SELECT name FROM sqlite_master WHERE type = 'table' AND name = 'glossary_terms'",
3572                [],
3573                |row| row.get(0),
3574            )
3575            .expect("glossary_terms table exists");
3576        assert_eq!(table, "glossary_terms");
3577        let index: String = conn
3578            .query_row(
3579                "SELECT name FROM sqlite_master WHERE type = 'index' AND name = 'idx_glossary_lookup'",
3580                [],
3581                |row| row.get(0),
3582            )
3583            .expect("idx_glossary_lookup exists");
3584        assert_eq!(index, "idx_glossary_lookup");
3585        let version: i64 = conn
3586            .query_row(
3587                "SELECT version FROM _migrations WHERE name = 'v1_2_glossary_terms'",
3588                [],
3589                |row| row.get(0),
3590            )
3591            .expect("v1_2 migration recorded");
3592        assert_eq!(version, 4);
3593        drop(conn);
3594        let _ = fs::remove_file(&db_path);
3595    }
3596
3597    #[test]
3598    fn migrate_is_idempotent_v1_2() {
3599        let db_path = temp_path("glossary_idem.sqlite");
3600        // Open twice; second open re-runs migrate() and must not error.
3601        {
3602            let _store = JobStore::open(&db_path).expect("first open");
3603        }
3604        {
3605            let _store = JobStore::open(&db_path).expect("second open");
3606        }
3607        let _ = fs::remove_file(&db_path);
3608    }
3609
3610    #[test]
3611    fn migrate_rebuilds_glossary_terms_with_nullable_target_text() {
3612        let db_path = temp_path("glossary_nullable_target.sqlite");
3613        {
3614            let conn = Connection::open(&db_path).expect("legacy db opens");
3615            conn.execute_batch(
3616                "
3617                CREATE TABLE _migrations (
3618                  version INTEGER PRIMARY KEY,
3619                  name TEXT NOT NULL,
3620                  applied_at TEXT NOT NULL
3621                );
3622                CREATE TABLE glossary_terms (
3623                  id INTEGER PRIMARY KEY,
3624                  scope_kind TEXT NOT NULL CHECK(scope_kind IN ('global', 'series', 'book')),
3625                  scope_id TEXT,
3626                  source_text TEXT NOT NULL,
3627                  target_text TEXT NOT NULL,
3628                  category TEXT NOT NULL CHECK(category IN
3629                    ('person', 'place', 'object', 'invented', 'style', 'phrase', 'other')),
3630                  notes TEXT,
3631                  case_sensitive INTEGER NOT NULL DEFAULT 0,
3632                  always_active INTEGER NOT NULL DEFAULT 0,
3633                  status TEXT NOT NULL CHECK(status IN
3634                    ('user_seeded', 'auto_candidate', 'accepted', 'rejected'))
3635                    DEFAULT 'user_seeded',
3636                  source_language TEXT NOT NULL,
3637                  target_language TEXT NOT NULL,
3638                  source_count INTEGER DEFAULT 0,
3639                  created_at TEXT NOT NULL,
3640                  updated_at TEXT NOT NULL,
3641                  UNIQUE(scope_kind, scope_id, source_text, source_language, target_language)
3642                );
3643                CREATE INDEX idx_glossary_lookup
3644                  ON glossary_terms(source_language, target_language, scope_kind, scope_id, status);
3645                INSERT INTO glossary_terms
3646                  (id, scope_kind, scope_id, source_text, target_text, category, notes,
3647                   case_sensitive, always_active, status, source_language, target_language,
3648                   source_count, created_at, updated_at)
3649                VALUES
3650                  (42, 'book', 'ivan', 'Ivan Ilych', 'Ivan Il''ich', 'person', 'legacy',
3651                   1, 0, 'user_seeded', 'English', 'Italian', 9, 'created', 'updated');
3652                ",
3653            )
3654            .expect("legacy schema should initialize");
3655        }
3656
3657        let store = JobStore::open(&db_path).expect("store opens and migrates");
3658        let conn = store.conn.borrow();
3659        assert!(
3660            !table_column_is_not_null(&conn, "glossary_terms", "target_text")
3661                .expect("table info should load")
3662        );
3663        let row = conn
3664            .query_row(
3665                "SELECT id, target_text, created_at, updated_at FROM glossary_terms WHERE id = 42",
3666                [],
3667                |row| {
3668                    Ok((
3669                        row.get::<_, i64>(0)?,
3670                        row.get::<_, String>(1)?,
3671                        row.get::<_, String>(2)?,
3672                        row.get::<_, String>(3)?,
3673                    ))
3674                },
3675            )
3676            .expect("legacy glossary row should survive");
3677        assert_eq!(row.0, 42);
3678        assert_eq!(row.1, "Ivan Il'ich");
3679        assert_eq!(row.2, "created");
3680        assert_eq!(row.3, "updated");
3681        let version: i64 = conn
3682            .query_row(
3683                "SELECT version FROM _migrations WHERE name = 'v1_2_1_nullable_glossary_candidate_targets'",
3684                [],
3685                |row| row.get(0),
3686            )
3687            .expect("v1.2.1 migration recorded");
3688        assert_eq!(version, 5);
3689        let index: String = conn
3690            .query_row(
3691                "SELECT name FROM sqlite_master WHERE type = 'index' AND name = 'idx_glossary_lookup'",
3692                [],
3693                |row| row.get(0),
3694            )
3695            .expect("idx_glossary_lookup should be recreated");
3696        assert_eq!(index, "idx_glossary_lookup");
3697        let duplicate = conn.execute(
3698            "INSERT INTO glossary_terms
3699              (scope_kind, scope_id, source_text, target_text, category, notes,
3700               case_sensitive, always_active, status, source_language, target_language,
3701               source_count, created_at, updated_at)
3702             VALUES
3703              ('book', 'ivan', 'Ivan Ilych', 'duplicate', 'person', NULL,
3704               1, 0, 'user_seeded', 'English', 'Italian', 1, 'now', 'now')",
3705            [],
3706        );
3707        assert!(
3708            duplicate.is_err(),
3709            "unique constraint should survive the rebuild"
3710        );
3711        drop(conn);
3712        let _ = fs::remove_file(&db_path);
3713    }
3714
3715    #[test]
3716    fn glossary_candidate_upsert_updates_auto_and_skips_rejected() {
3717        let db_path = temp_path("glossary_candidates.sqlite");
3718        let store = JobStore::open(&db_path).expect("store opens");
3719        let first = store
3720            .upsert_glossary_candidates(
3721                "ivan",
3722                "English",
3723                "Italian",
3724                &[NewGlossaryCandidate {
3725                    source_text: "Ivan Ilych",
3726                    category: GlossaryCategory::Other,
3727                    source_count: 4,
3728                }],
3729            )
3730            .expect("candidate inserts");
3731        assert_eq!(first.inserted, 1);
3732
3733        let candidates = store
3734            .list_glossary_candidates("ivan", "English", "Italian")
3735            .expect("candidates should list");
3736        assert_eq!(candidates.len(), 1);
3737        assert_eq!(candidates[0].target_text, None);
3738        assert_eq!(candidates[0].source_count, 4);
3739
3740        let second = store
3741            .upsert_glossary_candidates(
3742                "ivan",
3743                "English",
3744                "Italian",
3745                &[NewGlossaryCandidate {
3746                    source_text: "Ivan Ilych",
3747                    category: GlossaryCategory::Other,
3748                    source_count: 7,
3749                }],
3750            )
3751            .expect("candidate updates");
3752        assert_eq!(second.updated, 1);
3753        assert_eq!(
3754            store
3755                .list_glossary_candidates("ivan", "English", "Italian")
3756                .expect("candidates should list")[0]
3757                .source_count,
3758            7
3759        );
3760
3761        assert!(
3762            store
3763                .reject_glossary_candidate(candidates[0].id)
3764                .expect("candidate rejects")
3765        );
3766        let third = store
3767            .upsert_glossary_candidates(
3768                "ivan",
3769                "English",
3770                "Italian",
3771                &[NewGlossaryCandidate {
3772                    source_text: "Ivan Ilych",
3773                    category: GlossaryCategory::Other,
3774                    source_count: 9,
3775                }],
3776            )
3777            .expect("rejected candidate is skipped");
3778        assert_eq!(third.skipped, 1);
3779        assert!(
3780            store
3781                .list_glossary_candidates("ivan", "English", "Italian")
3782                .expect("rejected candidates are not pending")
3783                .is_empty()
3784        );
3785
3786        let all = store
3787            .list_glossary_terms(GlossaryFilter {
3788                scope_kind: Some(GlossaryScopeKind::Book),
3789                scope_id: Some("ivan"),
3790                source_language: Some("English"),
3791                target_language: Some("Italian"),
3792                active_only: false,
3793            })
3794            .expect("terms should list");
3795        assert_eq!(all.len(), 1);
3796        assert_eq!(all[0].status, GlossaryStatus::Rejected);
3797        assert_eq!(all[0].source_count, 7);
3798
3799        let seeded = glossary_term(
3800            bookforge_core::GlossaryScopeKind::Book,
3801            Some("ivan"),
3802            "Aragorn",
3803            "Aragorn",
3804        );
3805        let mut accepted = glossary_term(
3806            bookforge_core::GlossaryScopeKind::Book,
3807            Some("ivan"),
3808            "Mount Doom",
3809            "Monte Fato",
3810        );
3811        accepted.status = GlossaryStatus::Accepted;
3812        store
3813            .upsert_glossary_terms(&[seeded, accepted])
3814            .expect("active terms should insert");
3815        let fourth = store
3816            .upsert_glossary_candidates(
3817                "ivan",
3818                "English",
3819                "Italian",
3820                &[
3821                    NewGlossaryCandidate {
3822                        source_text: "Aragorn",
3823                        category: GlossaryCategory::Other,
3824                        source_count: 12,
3825                    },
3826                    NewGlossaryCandidate {
3827                        source_text: "Mount Doom",
3828                        category: GlossaryCategory::Other,
3829                        source_count: 11,
3830                    },
3831                ],
3832            )
3833            .expect("active terms are skipped");
3834        assert_eq!(fourth.skipped, 2);
3835
3836        let _ = fs::remove_file(&db_path);
3837    }
3838
3839    #[test]
3840    fn glossary_terms_upsert_list_and_active_lookup() {
3841        let db_path = temp_path("glossary_terms.sqlite");
3842        let store = JobStore::open(&db_path).expect("store opens");
3843        let mut global = glossary_term(
3844            bookforge_core::GlossaryScopeKind::Global,
3845            None,
3846            "Aragorn",
3847            "Aragorn",
3848        );
3849        let book = glossary_term(
3850            bookforge_core::GlossaryScopeKind::Book,
3851            Some("fellowship"),
3852            "Aragorn",
3853            "Granpasso",
3854        );
3855
3856        assert_eq!(
3857            store
3858                .upsert_glossary_terms(&[global.clone(), book.clone()])
3859                .expect("terms upsert"),
3860            2
3861        );
3862        global.target_text = "Aragorn II".to_string();
3863        store
3864            .upsert_glossary_terms(&[global.clone()])
3865            .expect("global term updates instead of duplicating");
3866
3867        let all = store
3868            .list_glossary_terms(GlossaryFilter::default())
3869            .expect("terms list");
3870        assert_eq!(all.len(), 2);
3871
3872        let active = store
3873            .load_active_glossary_terms("English", "Italian", Some("fellowship"), Some("lotr"))
3874            .expect("active terms");
3875        assert_eq!(active.len(), 2);
3876        assert!(active.iter().any(|term| term.target_text == "Granpasso"));
3877        let active_by_target = store
3878            .load_active_glossary_terms_for_target("Italian", Some("fellowship"), Some("lotr"))
3879            .expect("target-only active terms");
3880        assert_eq!(active_by_target.len(), 2);
3881        assert!(
3882            active_by_target
3883                .iter()
3884                .any(|term| term.source_language == "English" && term.target_text == "Granpasso")
3885        );
3886
3887        let removed = store
3888            .clear_glossary_scope(bookforge_core::GlossaryScopeKind::Global, None)
3889            .expect("global clear");
3890        assert_eq!(removed, 1);
3891
3892        let _ = fs::remove_file(&db_path);
3893    }
3894
3895    #[test]
3896    fn create_job_persists_book_and_series_ids() {
3897        let db_path = temp_path("glossary_jobids.sqlite");
3898        let input_path = temp_path("input_jobids.epub");
3899        fs::write(&input_path, b"epub bytes").expect("input fixture");
3900        let store = JobStore::open(&db_path).expect("store opens");
3901        let job = store
3902            .create_job(CreateJob {
3903                input: &input_path,
3904                output: &temp_path("out_jobids.epub"),
3905                source_lang: Some("English"),
3906                target_lang: "Italian",
3907                provider: "mock",
3908                model: "mock",
3909                base_url: None,
3910                api_key_env: None,
3911                book_id: Some("fellowship"),
3912                series_id: Some("lord-of-the-rings"),
3913            })
3914            .expect("job created");
3915        assert_eq!(job.book_id.as_deref(), Some("fellowship"));
3916        assert_eq!(job.series_id.as_deref(), Some("lord-of-the-rings"));
3917        let loaded = store
3918            .get_job(&job.id)
3919            .expect("get_job ok")
3920            .expect("job present");
3921        assert_eq!(loaded.book_id.as_deref(), Some("fellowship"));
3922        assert_eq!(loaded.series_id.as_deref(), Some("lord-of-the-rings"));
3923        let _ = fs::remove_file(&db_path);
3924        let _ = fs::remove_file(input_path);
3925    }
3926
3927    fn glossary_term(
3928        scope_kind: bookforge_core::GlossaryScopeKind,
3929        scope_id: Option<&str>,
3930        source: &str,
3931        target: &str,
3932    ) -> bookforge_core::GlossaryTerm {
3933        bookforge_core::GlossaryTerm {
3934            id: None,
3935            scope_kind,
3936            scope_id: scope_id.map(ToOwned::to_owned),
3937            source_text: source.to_string(),
3938            target_text: target.to_string(),
3939            category: bookforge_core::GlossaryCategory::Person,
3940            notes: None,
3941            case_sensitive: true,
3942            always_active: false,
3943            status: bookforge_core::GlossaryStatus::UserSeeded,
3944            source_language: "English".to_string(),
3945            target_language: "Italian".to_string(),
3946            source_count: 0,
3947        }
3948    }
3949}