1use std::{
2 cell::RefCell,
3 collections::HashMap,
4 fs,
5 path::{Path, PathBuf},
6 time::{Duration, SystemTime, UNIX_EPOCH},
7};
8
9use bookforge_core::{
10 Result as CoreResult,
11 ir::BlockId,
12 run_snapshot::RunConfigSnapshot,
13 segment::{BlockTranslation, Segment},
14};
15use rusqlite::{Connection, OptionalExtension, params};
16use sha2::{Digest, Sha256};
17
18pub type Result<T> = std::result::Result<T, StoreError>;
19
20#[derive(Debug, thiserror::Error)]
21pub enum StoreError {
22 #[error("I/O error: {0}")]
23 Io(#[from] std::io::Error),
24
25 #[error("SQLite error: {0}")]
26 Sqlite(#[from] rusqlite::Error),
27
28 #[error("core error: {0}")]
29 Core(#[from] bookforge_core::BookforgeError),
30
31 #[error("serialization error: {0}")]
32 Serialization(String),
33}
34
35pub struct JobStore {
36 conn: RefCell<Connection>,
37 path: PathBuf,
38}
39
40#[derive(Debug, Clone)]
41pub struct JobRecord {
42 pub id: String,
43 pub input_path: PathBuf,
44 pub input_snapshot_path: Option<PathBuf>,
45 pub input_sha256: Option<String>,
46 pub output_path: PathBuf,
47 pub input_hash: String,
48 pub source_lang: Option<String>,
49 pub target_lang: String,
50 pub provider: String,
51 pub model: String,
52 pub base_url: Option<String>,
53 pub api_key_env: Option<String>,
54 pub status: String,
55 pub events_path: Option<PathBuf>,
56 pub report_json_path: Option<PathBuf>,
57 pub report_markdown_path: Option<PathBuf>,
58}
59
60#[derive(Debug, Clone, Default)]
61pub struct JobSummary {
62 pub id: String,
63 pub status: String,
64 pub total_segments: usize,
65 pub succeeded: usize,
66 pub failed: usize,
67 pub needs_review: usize,
68 pub retry_pending: usize,
69 pub cached: usize,
70 pub retried: usize,
71 pub input_tokens: u64,
72 pub input_cached_tokens: u64,
73 pub output_tokens: u64,
74}
75
76#[derive(Debug, Clone, Copy)]
77pub struct CreateJob<'a> {
78 pub input: &'a Path,
79 pub output: &'a Path,
80 pub source_lang: Option<&'a str>,
81 pub target_lang: &'a str,
82 pub provider: &'a str,
83 pub model: &'a str,
84 pub base_url: Option<&'a str>,
85 pub api_key_env: Option<&'a str>,
86}
87
88#[derive(Debug, Clone)]
89pub struct SegmentRecord {
90 pub id: String,
91 pub status: String,
92 pub attempts: usize,
93 pub error: Option<String>,
94 pub input_tokens: Option<u64>,
95 pub input_cached_tokens: Option<u64>,
96 pub output_tokens: Option<u64>,
97 pub tokens_estimated: bool,
98}
99
100#[derive(Debug, Clone, PartialEq, Eq)]
101pub struct StoredBlockTranslation {
102 pub segment_id: String,
103 pub block_id: String,
104 pub text: String,
105}
106
107#[derive(Debug, Clone, PartialEq, Eq)]
108pub struct StoredSegmentTranslation {
109 pub segment_id: String,
110 pub ordinal: usize,
111 pub status: String,
112 pub error: Option<String>,
113 pub translated_text: String,
114 pub blocks: Vec<BlockTranslation>,
115}
116
117#[derive(Debug, Clone, PartialEq, Eq)]
118pub struct CachedTranslation {
119 pub translated_text: String,
120 pub blocks: Vec<BlockTranslation>,
121}
122
123#[derive(Debug, Clone, Copy)]
124pub struct SaveTranslation<'a> {
125 pub job_id: &'a str,
126 pub segment_id: &'a str,
127 pub translated_text: &'a str,
128 pub blocks: &'a [BlockTranslation],
129 pub provider: &'a str,
130 pub model: &'a str,
131 pub prompt_version: &'a str,
132 pub input_tokens: Option<u64>,
133 pub input_cached_tokens: Option<u64>,
134 pub output_tokens: Option<u64>,
135 pub tokens_estimated: bool,
136}
137
138#[derive(Debug, Clone, Copy)]
139pub struct SaveNeedsReview<'a> {
140 pub job_id: &'a str,
141 pub segment_id: &'a str,
142 pub preserved_text: &'a str,
143 pub blocks: &'a [BlockTranslation],
144 pub provider: &'a str,
145 pub model: &'a str,
146 pub prompt_version: &'a str,
147 pub error: &'a str,
148 pub input_tokens: Option<u64>,
149 pub input_cached_tokens: Option<u64>,
150 pub output_tokens: Option<u64>,
151 pub tokens_estimated: bool,
152}
153
154#[derive(Debug, Clone, Copy)]
155pub struct SaveCachedTranslation<'a> {
156 pub job_id: &'a str,
157 pub segment_id: &'a str,
158 pub translated_text: &'a str,
159 pub blocks: &'a [BlockTranslation],
160 pub provider: &'a str,
161 pub model: &'a str,
162 pub prompt_version: &'a str,
163}
164
165#[derive(Debug, Clone, Copy)]
166pub struct CacheLookupRequest<'a> {
167 pub prompt_version: &'a str,
168 pub provider: &'a str,
169 pub model: &'a str,
170 pub source_lang: Option<&'a str>,
171 pub target_lang: &'a str,
172 pub cache_namespace: &'a str,
173}
174
175#[derive(Debug, Clone, Copy)]
176pub struct NewSegmentFlag<'a> {
177 pub job_id: &'a str,
178 pub segment_id: &'a str,
179 pub kind: &'a str,
180 pub note: Option<&'a str>,
181 pub suggested_source: Option<&'a str>,
182 pub suggested_target: Option<&'a str>,
183 pub consumed: bool,
184}
185
186#[derive(Debug, Clone)]
187pub struct StorageDoctor {
188 pub database_path: PathBuf,
189 pub database_exists: bool,
190 pub wal_present: bool,
191 pub shm_present: bool,
192 pub journal_mode: String,
193 pub integrity_check: String,
194 pub wal_sidecars_normal: bool,
195 pub note: String,
196}
197
198pub fn run_doctor(db_path: Option<PathBuf>) -> Result<StorageDoctor> {
199 let path = db_path.unwrap_or_else(|| PathBuf::from(".bookforge/jobs.sqlite"));
200 let database_exists = path.exists();
201 let wal_path = path.with_extension("sqlite-wal");
202 let shm_path = path.with_extension("sqlite-shm");
203 let wal_present = wal_path.exists();
204 let shm_present = shm_path.exists();
205
206 let (journal_mode, integrity_check, wal_sidecars_normal, note) = if database_exists {
207 let conn = Connection::open(&path)?;
208 let journal_mode: String = conn
209 .pragma_query_value(None, "journal_mode", |row| row.get(0))
210 .unwrap_or_else(|_| "unknown".to_string());
211 let integrity_check: String = conn
212 .pragma_query_value(None, "integrity_check", |row| row.get(0))
213 .unwrap_or_else(|_| "error".to_string());
214 let _ = conn.execute_batch("PRAGMA wal_checkpoint(PASSIVE);");
215
216 let wal_sidecars_normal = if wal_present || shm_present {
217 integrity_check == "ok"
218 } else {
219 true
220 };
221
222 let note = if wal_present || shm_present {
223 "WAL sidecar files are normal. SQLite will recover them automatically. \
224 Do not delete them manually while BookForge is running."
225 .to_string()
226 } else {
227 String::new()
228 };
229
230 (journal_mode, integrity_check, wal_sidecars_normal, note)
231 } else {
232 ("unknown".to_string(), String::new(), true, String::new())
233 };
234
235 Ok(StorageDoctor {
236 database_path: path,
237 database_exists,
238 wal_present,
239 shm_present,
240 journal_mode,
241 integrity_check,
242 wal_sidecars_normal,
243 note,
244 })
245}
246
247impl JobStore {
248 pub fn open_default() -> Result<Self> {
249 Self::open(".bookforge/jobs.sqlite")
250 }
251
252 pub fn open(path: impl Into<PathBuf>) -> Result<Self> {
253 let path = path.into();
254 if let Some(parent) = path.parent() {
255 fs::create_dir_all(parent)?;
256 }
257 let conn = Connection::open(&path)?;
258
259 conn.busy_timeout(Duration::from_secs(5))?;
260 conn.pragma_update(None, "journal_mode", "WAL")?;
261 conn.pragma_update(None, "synchronous", "NORMAL")?;
262 conn.pragma_update(None, "foreign_keys", "ON")?;
263
264 let store = Self {
265 conn: RefCell::new(conn),
266 path,
267 };
268 store.migrate()?;
269 Ok(store)
270 }
271
272 pub fn path(&self) -> &Path {
273 &self.path
274 }
275
276 pub fn create_job(&self, request: CreateJob<'_>) -> Result<JobRecord> {
277 let input_hash = file_hash(request.input)?;
278 let id = format!("job_{}_{}", unix_timestamp_nanos(), &input_hash[..12]);
279 let now = timestamp_string();
280 let input_path = request.input.to_path_buf();
281 let output_path = request.output.to_path_buf();
282 let conn = self.conn.borrow();
283 conn.execute(
284 "INSERT INTO jobs
285 (id, input_path, output_path, input_hash, source_lang, target_lang, provider, model, base_url, api_key_env, status, created_at, updated_at)
286 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, ?9, ?10, 'running', ?11, ?11)",
287 params![
288 id,
289 input_path.to_string_lossy(),
290 output_path.to_string_lossy(),
291 input_hash,
292 request.source_lang,
293 request.target_lang,
294 request.provider,
295 request.model,
296 request.base_url,
297 request.api_key_env,
298 now,
299 ],
300 )?;
301
302 Ok(JobRecord {
303 id,
304 input_path,
305 input_snapshot_path: None,
306 input_sha256: None,
307 output_path,
308 input_hash,
309 source_lang: request.source_lang.map(ToOwned::to_owned),
310 target_lang: request.target_lang.to_string(),
311 provider: request.provider.to_string(),
312 model: request.model.to_string(),
313 base_url: request.base_url.map(ToOwned::to_owned),
314 api_key_env: request.api_key_env.map(ToOwned::to_owned),
315 status: "running".to_string(),
316 events_path: None,
317 report_json_path: None,
318 report_markdown_path: None,
319 })
320 }
321
322 pub fn update_job_config_snapshot(
323 &self,
324 job_id: &str,
325 snapshot: &RunConfigSnapshot,
326 ) -> Result<()> {
327 let json = serde_json::to_string(snapshot)
328 .map_err(|e| StoreError::Serialization(e.to_string()))?;
329 let conn = self.conn.borrow();
330 conn.execute(
331 "UPDATE jobs
332 SET config_json = ?1,
333 events_path = ?2,
334 report_json_path = ?3,
335 report_markdown_path = ?4,
336 input_snapshot_path = ?5,
337 input_sha256 = ?6,
338 updated_at = ?7
339 WHERE id = ?8",
340 params![
341 json,
342 snapshot
343 .events_path
344 .as_ref()
345 .map(|path| path.to_string_lossy().to_string()),
346 snapshot
347 .report_json_path
348 .as_ref()
349 .map(|path| path.to_string_lossy().to_string()),
350 snapshot
351 .report_markdown_path
352 .as_ref()
353 .map(|path| path.to_string_lossy().to_string()),
354 snapshot
355 .input_snapshot_path
356 .as_ref()
357 .map(|path| path.to_string_lossy().to_string()),
358 snapshot.input_sha256.as_deref(),
359 timestamp_string(),
360 job_id,
361 ],
362 )?;
363 Ok(())
364 }
365
366 pub fn update_job_input_snapshot(
367 &self,
368 job_id: &str,
369 snapshot_path: &Path,
370 input_sha256: &str,
371 ) -> Result<()> {
372 let conn = self.conn.borrow();
373 conn.execute(
374 "UPDATE jobs
375 SET input_snapshot_path = ?1,
376 input_sha256 = ?2,
377 updated_at = ?3
378 WHERE id = ?4",
379 params![
380 snapshot_path.to_string_lossy(),
381 input_sha256,
382 timestamp_string(),
383 job_id
384 ],
385 )?;
386 Ok(())
387 }
388
389 pub fn load_job_config_snapshot(&self, job_id: &str) -> Result<Option<RunConfigSnapshot>> {
390 let conn = self.conn.borrow();
391 let Some(json) = conn
392 .query_row(
393 "SELECT config_json FROM jobs WHERE id = ?1",
394 params![job_id],
395 |row| row.get::<_, Option<String>>(0),
396 )
397 .optional()?
398 .flatten()
399 else {
400 return Ok(None);
401 };
402
403 serde_json::from_str(&json)
404 .map(Some)
405 .map_err(|e| StoreError::Serialization(e.to_string()))
406 }
407
408 pub fn update_job_event_path(&self, job_id: &str, path: &Path) -> Result<()> {
409 let conn = self.conn.borrow();
410 conn.execute(
411 "UPDATE jobs SET events_path = ?1, updated_at = ?2 WHERE id = ?3",
412 params![path.to_string_lossy(), timestamp_string(), job_id],
413 )?;
414 Ok(())
415 }
416
417 pub fn update_job_report_paths(
418 &self,
419 job_id: &str,
420 json_path: &Path,
421 markdown_path: &Path,
422 ) -> Result<()> {
423 let conn = self.conn.borrow();
424 conn.execute(
425 "UPDATE jobs
426 SET report_json_path = ?1, report_markdown_path = ?2, updated_at = ?3
427 WHERE id = ?4",
428 params![
429 json_path.to_string_lossy(),
430 markdown_path.to_string_lossy(),
431 timestamp_string(),
432 job_id
433 ],
434 )?;
435 Ok(())
436 }
437
438 pub fn update_job_output_path(&self, job_id: &str, path: &Path) -> Result<()> {
439 let conn = self.conn.borrow();
440 conn.execute(
441 "UPDATE jobs SET output_path = ?1, updated_at = ?2 WHERE id = ?3",
442 params![path.to_string_lossy(), timestamp_string(), job_id],
443 )?;
444 Ok(())
445 }
446
447 pub fn insert_segments(
448 &self,
449 job_id: &str,
450 segments: &[Segment],
451 prompt_version: &str,
452 provider: &str,
453 model: &str,
454 cache_namespace: &str,
455 ) -> Result<()> {
456 let mut conn = self.conn.borrow_mut();
457 let tx = conn.transaction()?;
458 for segment in segments {
459 tx.execute(
460 "INSERT OR IGNORE INTO segments
461 (id, job_id, section_id, ordinal, source_hash, prompt_version, provider, model, status, attempts, cache_namespace)
462 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8, 'queued', 0, ?9)",
463 params![
464 segment.id.0,
465 job_id,
466 segment.section_id.0,
467 segment.ordinal as i64,
468 segment.checksum,
469 prompt_version,
470 provider,
471 model,
472 cache_namespace,
473 ],
474 )?;
475 }
476 tx.commit()?;
477 Ok(())
478 }
479
480 pub fn save_translation(&self, request: SaveTranslation<'_>) -> Result<()> {
481 let now = timestamp_string();
482 let translated_hash = stable_hash(request.translated_text);
483 {
484 let conn = self.conn.borrow();
485 conn.execute(
486 "INSERT OR REPLACE INTO translations
487 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
488 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
489 params![
490 request.segment_id,
491 request.job_id,
492 request.translated_text,
493 request.provider,
494 request.model,
495 request.prompt_version,
496 now
497 ],
498 )?;
499 replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
500 conn.execute(
501 "UPDATE segments
502 SET status = 'succeeded',
503 attempts = attempts + 1,
504 tokens_input = ?1,
505 tokens_input_cached = ?2,
506 tokens_output = ?3,
507 tokens_estimated = ?4,
508 translated_hash = ?5,
509 error = NULL
510 WHERE job_id = ?6 AND id = ?7",
511 params![
512 request.input_tokens.map(|value| value as i64),
513 request.input_cached_tokens.map(|value| value as i64),
514 request.output_tokens.map(|value| value as i64),
515 if request.tokens_estimated {
516 1_i64
517 } else {
518 0_i64
519 },
520 translated_hash,
521 request.job_id,
522 request.segment_id,
523 ],
524 )?;
525 }
526 self.touch_job(request.job_id, "running")?;
527 Ok(())
528 }
529
530 pub fn save_needs_review(&self, request: SaveNeedsReview<'_>) -> Result<()> {
531 let now = timestamp_string();
532 let translated_hash = stable_hash(request.preserved_text);
533 {
534 let conn = self.conn.borrow();
535 conn.execute(
536 "INSERT OR REPLACE INTO translations
537 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
538 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
539 params![
540 request.segment_id,
541 request.job_id,
542 request.preserved_text,
543 request.provider,
544 request.model,
545 request.prompt_version,
546 now
547 ],
548 )?;
549 replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
550 conn.execute(
551 "UPDATE segments
552 SET status = 'needs_review',
553 attempts = attempts + 1,
554 tokens_input = ?1,
555 tokens_input_cached = ?2,
556 tokens_output = ?3,
557 tokens_estimated = ?4,
558 translated_hash = ?5,
559 error = ?6
560 WHERE job_id = ?7 AND id = ?8",
561 params![
562 request.input_tokens.map(|value| value as i64),
563 request.input_cached_tokens.map(|value| value as i64),
564 request.output_tokens.map(|value| value as i64),
565 if request.tokens_estimated {
566 1_i64
567 } else {
568 0_i64
569 },
570 translated_hash,
571 request.error,
572 request.job_id,
573 request.segment_id
574 ],
575 )?;
576 }
577 self.touch_job(request.job_id, "needs_review")?;
578 Ok(())
579 }
580
581 pub fn save_cached_translation(&self, request: SaveCachedTranslation<'_>) -> Result<()> {
582 let now = timestamp_string();
583 let translated_hash = stable_hash(request.translated_text);
584 {
585 let conn = self.conn.borrow();
586 conn.execute(
587 "INSERT OR REPLACE INTO translations
588 (segment_id, job_id, translated_text, provider, model, prompt_version, created_at)
589 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7)",
590 params![
591 request.segment_id,
592 request.job_id,
593 request.translated_text,
594 request.provider,
595 request.model,
596 request.prompt_version,
597 now
598 ],
599 )?;
600 replace_block_translations(&conn, request.job_id, request.segment_id, request.blocks)?;
601 conn.execute(
602 "UPDATE segments
603 SET status = 'skipped_cached',
604 tokens_input = NULL,
605 tokens_input_cached = NULL,
606 tokens_output = NULL,
607 tokens_estimated = 0,
608 translated_hash = ?1,
609 error = NULL
610 WHERE job_id = ?2 AND id = ?3",
611 params![translated_hash, request.job_id, request.segment_id],
612 )?;
613 }
614 self.touch_job(request.job_id, "running")?;
615 Ok(())
616 }
617
618 pub fn mark_job_complete(&self, job_id: &str) -> Result<()> {
619 self.touch_job(job_id, "succeeded")
620 }
621
622 pub fn mark_job_running(&self, job_id: &str) -> Result<()> {
623 self.touch_job(job_id, "running")
624 }
625
626 pub fn mark_job_succeeded(&self, job_id: &str) -> Result<()> {
627 self.mark_job_complete(job_id)
628 }
629
630 pub fn mark_job_needs_review(&self, job_id: &str) -> Result<()> {
631 self.touch_job(job_id, "needs_review")
632 }
633
634 pub fn mark_job_interrupted(&self, job_id: &str) -> Result<()> {
635 self.touch_job(job_id, "interrupted")
636 }
637
638 pub fn mark_job_failed(&self, job_id: &str) -> Result<()> {
639 self.touch_job(job_id, "failed")
640 }
641
642 pub fn mark_segment_failed(&self, job_id: &str, segment_id: &str, error: &str) -> Result<()> {
643 {
644 let conn = self.conn.borrow();
645 conn.execute(
646 "UPDATE segments SET status = 'failed', attempts = attempts + 1, error = ?1 WHERE job_id = ?2 AND id = ?3",
647 params![error, job_id, segment_id],
648 )?;
649 }
650 self.touch_job(job_id, "failed")?;
651 Ok(())
652 }
653
654 pub fn mark_segment_failed_if_unfinished(
655 &self,
656 job_id: &str,
657 segment_id: &str,
658 error: &str,
659 ) -> Result<()> {
660 {
661 let conn = self.conn.borrow();
662 conn.execute(
663 "UPDATE segments
664 SET status = 'failed', attempts = attempts + 1, error = ?1
665 WHERE job_id = ?2
666 AND id = ?3
667 AND status NOT IN ('succeeded', 'skipped_cached', 'needs_review')",
668 params![error, job_id, segment_id],
669 )?;
670 }
671 self.touch_job(job_id, "failed")?;
672 Ok(())
673 }
674
675 pub fn mark_unfinished_segments_failed(
676 &self,
677 job_id: &str,
678 candidate_segment_ids: &[String],
679 error: &str,
680 ) -> Result<usize> {
681 const SQLITE_IN_CHUNK_SIZE: usize = 900;
682 let mut updated = 0;
683
684 for chunk in candidate_segment_ids.chunks(SQLITE_IN_CHUNK_SIZE) {
685 if chunk.is_empty() {
686 continue;
687 }
688
689 let placeholders = std::iter::repeat_n("?", chunk.len())
690 .collect::<Vec<_>>()
691 .join(", ");
692 let sql = format!(
693 "UPDATE segments
694 SET status = 'failed', attempts = attempts + 1, error = ?
695 WHERE job_id = ?
696 AND id IN ({placeholders})
697 AND status NOT IN ('succeeded', 'skipped_cached', 'needs_review')"
698 );
699
700 let conn = self.conn.borrow();
701 let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 2);
702 params.push(&error);
703 params.push(&job_id);
704 for id in chunk {
705 params.push(id);
706 }
707 updated += conn.execute(&sql, params.as_slice())?;
708 }
709
710 if updated > 0 {
711 self.touch_job(job_id, "failed")?;
712 }
713 Ok(updated)
714 }
715
716 pub fn get_job(&self, job_id: &str) -> Result<Option<JobRecord>> {
717 let conn = self.conn.borrow();
718 conn.query_row(
719 "SELECT id, input_path, input_snapshot_path, input_sha256, output_path, input_hash, source_lang, target_lang, provider, model, base_url, api_key_env, status,
720 events_path, report_json_path, report_markdown_path
721 FROM jobs WHERE id = ?1",
722 params![job_id],
723 |row| {
724 Ok(JobRecord {
725 id: row.get(0)?,
726 input_path: PathBuf::from(row.get::<_, String>(1)?),
727 input_snapshot_path: row.get::<_, Option<String>>(2)?.map(PathBuf::from),
728 input_sha256: row.get(3)?,
729 output_path: PathBuf::from(row.get::<_, String>(4)?),
730 input_hash: row.get(5)?,
731 source_lang: row.get(6)?,
732 target_lang: row.get(7)?,
733 provider: row.get(8)?,
734 model: row.get(9)?,
735 base_url: row.get(10)?,
736 api_key_env: row.get(11)?,
737 status: row.get(12)?,
738 events_path: row.get::<_, Option<String>>(13)?.map(PathBuf::from),
739 report_json_path: row.get::<_, Option<String>>(14)?.map(PathBuf::from),
740 report_markdown_path: row.get::<_, Option<String>>(15)?.map(PathBuf::from),
741 })
742 },
743 )
744 .optional()
745 .map_err(StoreError::from)
746 }
747
748 pub fn summary(&self, job_id: &str) -> Result<Option<JobSummary>> {
749 let Some(job) = self.get_job(job_id)? else {
750 return Ok(None);
751 };
752 let conn = self.conn.borrow();
753 let mut summary = JobSummary {
754 id: job.id,
755 status: job.status,
756 ..JobSummary::default()
757 };
758
759 let mut stmt = conn.prepare(
760 "SELECT status,
761 COUNT(*),
762 COALESCE(SUM(COALESCE(tokens_input, input_tokens)), 0),
763 COALESCE(SUM(tokens_input_cached), 0),
764 COALESCE(SUM(COALESCE(tokens_output, output_tokens)), 0)
765 FROM segments WHERE job_id = ?1 GROUP BY status",
766 )?;
767 let rows = stmt.query_map(params![job_id], |row| {
768 Ok((
769 row.get::<_, String>(0)?,
770 row.get::<_, i64>(1)?,
771 row.get::<_, i64>(2)?,
772 row.get::<_, i64>(3)?,
773 row.get::<_, i64>(4)?,
774 ))
775 })?;
776
777 for row in rows {
778 let (status, count, input_tokens, input_cached_tokens, output_tokens) = row?;
779 let count = count as usize;
780 summary.total_segments += count;
781 summary.input_tokens += input_tokens as u64;
782 summary.input_cached_tokens += input_cached_tokens as u64;
783 summary.output_tokens += output_tokens as u64;
784 match status.as_str() {
785 "succeeded" => summary.succeeded += count,
786 "failed" => summary.failed += count,
787 "needs_review" => summary.needs_review += count,
788 "retry_pending" => summary.retry_pending += count,
789 "skipped_cached" => summary.cached += count,
790 _ => {}
791 }
792 }
793
794 summary.retried = conn.query_row(
795 "SELECT COUNT(*) FROM segments WHERE job_id = ?1 AND attempts > 1",
796 params![job_id],
797 |row| row.get::<_, i64>(0),
798 )? as usize;
799
800 Ok(Some(summary))
801 }
802
803 pub fn retry_segments(&self, job_id: &str, scope: RetryScope) -> Result<usize> {
804 let where_status = match scope {
805 RetryScope::Failed => "status = 'failed'",
806 RetryScope::NeedsReview => "status = 'needs_review'",
807 RetryScope::All => "status IN ('failed', 'needs_review')",
808 };
809 let sql = format!(
810 "UPDATE segments SET status = 'retry_pending', error = NULL WHERE job_id = ?1 AND {where_status}"
811 );
812 let count = {
813 let conn = self.conn.borrow();
814 conn.execute(&sql, params![job_id])?
815 };
816 self.touch_job(job_id, "retry_pending")?;
817 Ok(count)
818 }
819
820 pub fn insert_segment_flags(&self, flags: &[NewSegmentFlag<'_>]) -> Result<usize> {
821 let mut conn = self.conn.borrow_mut();
822 let tx = conn.transaction()?;
823 let ingested_at = timestamp_string();
824 let mut inserted = 0usize;
825 for flag in flags {
826 inserted += tx.execute(
827 "INSERT INTO segment_flags
828 (job_id, segment_id, kind, note, suggested_source, suggested_target, ingested_at, consumed)
829 VALUES (?1, ?2, ?3, ?4, ?5, ?6, ?7, ?8)",
830 params![
831 flag.job_id,
832 flag.segment_id,
833 flag.kind,
834 flag.note,
835 flag.suggested_source,
836 flag.suggested_target,
837 ingested_at,
838 if flag.consumed { 1_i64 } else { 0_i64 },
839 ],
840 )?;
841 }
842 tx.commit()?;
843 Ok(inserted)
844 }
845
846 pub fn mark_segments_needs_review(
847 &self,
848 job_id: &str,
849 segment_ids: &[String],
850 reason: &str,
851 ) -> Result<usize> {
852 const SQLITE_IN_CHUNK_SIZE: usize = 900;
853 let mut updated = 0usize;
854 for chunk in segment_ids.chunks(SQLITE_IN_CHUNK_SIZE) {
855 if chunk.is_empty() {
856 continue;
857 }
858 let placeholders = std::iter::repeat_n("?", chunk.len())
859 .collect::<Vec<_>>()
860 .join(", ");
861 let sql = format!(
862 "UPDATE segments
863 SET status = 'needs_review',
864 error = ?
865 WHERE job_id = ?
866 AND id IN ({placeholders})"
867 );
868 let conn = self.conn.borrow();
869 let mut params: Vec<&dyn rusqlite::types::ToSql> = Vec::with_capacity(chunk.len() + 2);
870 params.push(&reason);
871 params.push(&job_id);
872 for id in chunk {
873 params.push(id);
874 }
875 updated += conn.execute(&sql, params.as_slice())?;
876 }
877 if updated > 0 {
878 self.touch_job(job_id, "needs_review")?;
879 }
880 Ok(updated)
881 }
882
883 pub fn segment_flag_count(&self, job_id: &str) -> Result<usize> {
884 let conn = self.conn.borrow();
885 let count = conn.query_row(
886 "SELECT COUNT(*) FROM segment_flags WHERE job_id = ?1",
887 params![job_id],
888 |row| row.get::<_, i64>(0),
889 )?;
890 Ok(count as usize)
891 }
892
893 pub fn pending_segment_ids(&self, job_id: &str) -> Result<Vec<String>> {
894 let conn = self.conn.borrow();
895 let mut stmt = conn.prepare(
896 "SELECT id FROM segments
897 WHERE job_id = ?1 AND status IN ('queued', 'retry_pending')
898 ORDER BY ordinal",
899 )?;
900 let rows = stmt.query_map(params![job_id], |row| row.get::<_, String>(0))?;
901 rows.collect::<std::result::Result<Vec<_>, _>>()
902 .map_err(StoreError::from)
903 }
904
905 pub fn segment_records(&self, job_id: &str) -> Result<Vec<SegmentRecord>> {
906 let conn = self.conn.borrow();
907 let mut stmt = conn.prepare(
908 "SELECT id,
909 status,
910 attempts,
911 error,
912 COALESCE(tokens_input, input_tokens),
913 tokens_input_cached,
914 COALESCE(tokens_output, output_tokens),
915 tokens_estimated
916 FROM segments WHERE job_id = ?1 ORDER BY ordinal",
917 )?;
918 let rows = stmt.query_map(params![job_id], |row| {
919 Ok(SegmentRecord {
920 id: row.get(0)?,
921 status: row.get(1)?,
922 attempts: row.get::<_, i64>(2)? as usize,
923 error: row.get(3)?,
924 input_tokens: row.get::<_, Option<i64>>(4)?.map(|value| value as u64),
925 input_cached_tokens: row.get::<_, Option<i64>>(5)?.map(|value| value as u64),
926 output_tokens: row.get::<_, Option<i64>>(6)?.map(|value| value as u64),
927 tokens_estimated: row.get::<_, i64>(7)? != 0,
928 })
929 })?;
930 rows.collect::<std::result::Result<Vec<_>, _>>()
931 .map_err(StoreError::from)
932 }
933
934 pub fn load_block_translations(&self, job_id: &str) -> Result<Vec<StoredBlockTranslation>> {
935 let conn = self.conn.borrow();
936 let mut stmt = conn.prepare(
937 "SELECT segment_id, block_id, translated_text
938 FROM translation_blocks WHERE job_id = ?1 ORDER BY segment_id, block_id",
939 )?;
940 let rows = stmt.query_map(params![job_id], |row| {
941 Ok(StoredBlockTranslation {
942 segment_id: row.get(0)?,
943 block_id: row.get(1)?,
944 text: row.get(2)?,
945 })
946 })?;
947 rows.collect::<std::result::Result<Vec<_>, _>>()
948 .map_err(StoreError::from)
949 }
950
951 pub fn load_terminal_segment_translations(
952 &self,
953 job_id: &str,
954 ) -> Result<Vec<StoredSegmentTranslation>> {
955 let conn = self.conn.borrow();
956 let mut stmt = conn.prepare(
957 "SELECT s.id, s.ordinal, s.status, s.error, t.translated_text
958 FROM segments s
959 JOIN translations t ON t.job_id = s.job_id AND t.segment_id = s.id
960 WHERE s.job_id = ?1 AND s.status IN ('succeeded', 'skipped_cached', 'needs_review')
961 ORDER BY s.ordinal",
962 )?;
963 let rows = stmt.query_map(params![job_id], |row| {
964 Ok((
965 row.get::<_, String>(0)?,
966 row.get::<_, i64>(1)?,
967 row.get::<_, String>(2)?,
968 row.get::<_, Option<String>>(3)?,
969 row.get::<_, String>(4)?,
970 ))
971 })?;
972
973 let mut records = Vec::new();
974 for row in rows {
975 let (segment_id, ordinal, status, error, translated_text) = row?;
976 let mut block_stmt = conn.prepare(
977 "SELECT block_id, translated_text
978 FROM translation_blocks
979 WHERE job_id = ?1 AND segment_id = ?2
980 ORDER BY block_id",
981 )?;
982 let blocks = block_stmt
983 .query_map(params![job_id, segment_id.as_str()], |row| {
984 Ok(BlockTranslation {
985 block_id: BlockId(row.get::<_, String>(0)?),
986 text: row.get(1)?,
987 })
988 })?
989 .collect::<std::result::Result<Vec<_>, _>>()?;
990 records.push(StoredSegmentTranslation {
991 segment_id,
992 ordinal: ordinal as usize,
993 status,
994 error,
995 translated_text,
996 blocks,
997 });
998 }
999
1000 Ok(records)
1001 }
1002
1003 pub fn resumable_segment_ids(&self, job_id: &str) -> Result<Vec<String>> {
1004 let conn = self.conn.borrow();
1005 let mut stmt = conn.prepare(
1006 "SELECT id FROM segments
1007 WHERE job_id = ?1 AND status IN ('queued', 'retry_pending', 'failed')
1008 ORDER BY ordinal",
1009 )?;
1010 let rows = stmt.query_map(params![job_id], |row| row.get::<_, String>(0))?;
1011 rows.collect::<std::result::Result<Vec<_>, _>>()
1012 .map_err(StoreError::from)
1013 }
1014
1015 #[allow(clippy::too_many_arguments)]
1016 pub fn find_cached_translation(
1017 &self,
1018 segment: &Segment,
1019 prompt_version: &str,
1020 provider: &str,
1021 model: &str,
1022 source_lang: Option<&str>,
1023 target_lang: &str,
1024 cache_namespace: &str,
1025 ) -> Result<Option<CachedTranslation>> {
1026 let conn = self.conn.borrow();
1027 let cached = conn
1028 .query_row(
1029 "SELECT t.job_id, t.segment_id, t.translated_text
1030 FROM translations t
1031 JOIN segments s ON s.job_id = t.job_id AND s.id = t.segment_id
1032 JOIN jobs j ON j.id = t.job_id
1033 WHERE s.source_hash = ?1
1034 AND s.prompt_version = ?2
1035 AND s.provider = ?3
1036 AND s.model = ?4
1037 AND ((?5 IS NULL AND j.source_lang IS NULL) OR j.source_lang = ?5)
1038 AND j.target_lang = ?6
1039 AND s.cache_namespace = ?7
1040 AND s.status IN ('succeeded', 'skipped_cached')
1041 ORDER BY t.created_at DESC
1042 LIMIT 1",
1043 params![
1044 segment.checksum,
1045 prompt_version,
1046 provider,
1047 model,
1048 source_lang,
1049 target_lang,
1050 cache_namespace,
1051 ],
1052 |row| {
1053 Ok((
1054 row.get::<_, String>(0)?,
1055 row.get::<_, String>(1)?,
1056 row.get::<_, String>(2)?,
1057 ))
1058 },
1059 )
1060 .optional()?;
1061
1062 let Some((job_id, segment_id, translated_text)) = cached else {
1063 return Ok(None);
1064 };
1065
1066 let mut stmt = conn.prepare(
1067 "SELECT block_id, translated_text
1068 FROM translation_blocks
1069 WHERE job_id = ?1 AND segment_id = ?2
1070 ORDER BY block_id",
1071 )?;
1072 let rows = stmt.query_map(params![job_id, segment_id], |row| {
1073 Ok(BlockTranslation {
1074 block_id: BlockId(row.get::<_, String>(0)?),
1075 text: row.get(1)?,
1076 })
1077 })?;
1078 let blocks = rows.collect::<std::result::Result<Vec<_>, _>>()?;
1079
1080 let mut by_id = blocks
1081 .into_iter()
1082 .map(|block| (block.block_id.0.clone(), block))
1083 .collect::<HashMap<_, _>>();
1084
1085 let mut ordered = Vec::with_capacity(segment.block_ids.len());
1086 for id in &segment.block_ids {
1087 let Some(block) = by_id.remove(&id.0) else {
1088 return Ok(None);
1089 };
1090 ordered.push(block);
1091 }
1092 if !by_id.is_empty() {
1093 return Ok(None);
1094 }
1095
1096 Ok(Some(CachedTranslation {
1097 translated_text,
1098 blocks: ordered,
1099 }))
1100 }
1101
1102 pub fn find_cached_translations_batch(
1103 &self,
1104 segments: &[Segment],
1105 request: CacheLookupRequest<'_>,
1106 ) -> Result<HashMap<String, CachedTranslation>> {
1107 let mut results = HashMap::new();
1108 if segments.is_empty() {
1109 return Ok(results);
1110 }
1111
1112 const SQLITE_IN_CHUNK_SIZE: usize = 900;
1113
1114 for chunk in segments.chunks(SQLITE_IN_CHUNK_SIZE) {
1115 let hashes: Vec<&str> = chunk.iter().map(|s| s.checksum.as_str()).collect();
1116 let placeholders: Vec<String> =
1117 (0..hashes.len()).map(|i| format!("?{}", i + 1)).collect();
1118 let placeholders_sql = placeholders.join(", ");
1119
1120 let sql = format!(
1121 "SELECT t.job_id, t.segment_id, t.translated_text, s.source_hash
1122 FROM translations t
1123 JOIN segments s ON s.job_id = t.job_id AND s.id = t.segment_id
1124 JOIN jobs j ON j.id = t.job_id
1125 WHERE s.source_hash IN ({placeholders_sql})
1126 AND s.prompt_version = ?{}
1127 AND s.provider = ?{}
1128 AND s.model = ?{}
1129 AND ((?{} IS NULL AND j.source_lang IS NULL) OR j.source_lang = ?{})
1130 AND j.target_lang = ?{}
1131 AND s.cache_namespace = ?{}
1132 AND s.status IN ('succeeded', 'skipped_cached')
1133 ORDER BY t.created_at DESC",
1134 hashes.len() + 1,
1135 hashes.len() + 2,
1136 hashes.len() + 3,
1137 hashes.len() + 4,
1138 hashes.len() + 5,
1139 hashes.len() + 6,
1140 hashes.len() + 7,
1141 );
1142
1143 let mut params: Vec<Box<dyn rusqlite::types::ToSql>> = Vec::new();
1144 for hash in &hashes {
1145 params.push(Box::new(hash.to_string()));
1146 }
1147 params.push(Box::new(request.prompt_version.to_string()));
1148 params.push(Box::new(request.provider.to_string()));
1149 params.push(Box::new(request.model.to_string()));
1150 params.push(Box::new(request.source_lang.map(|s| s.to_string())));
1151 params.push(Box::new(request.source_lang.map(|s| s.to_string())));
1152 params.push(Box::new(request.target_lang.to_string()));
1153 params.push(Box::new(request.cache_namespace.to_string()));
1154
1155 let conn = self.conn.borrow();
1156 let mut stmt = conn.prepare(&sql)?;
1157 let param_refs: Vec<&dyn rusqlite::types::ToSql> =
1158 params.iter().map(|p| p.as_ref()).collect();
1159
1160 let rows = stmt.query_map(param_refs.as_slice(), |row| {
1161 Ok((
1162 row.get::<_, String>(0)?,
1163 row.get::<_, String>(1)?,
1164 row.get::<_, String>(2)?,
1165 row.get::<_, String>(3)?,
1166 ))
1167 })?;
1168
1169 let mut hash_to_hit: HashMap<String, (String, String, String)> = HashMap::new();
1170 for row in rows {
1171 let (job_id, segment_id, translated_text, source_hash) = row?;
1172 hash_to_hit
1173 .entry(source_hash)
1174 .or_insert((job_id, segment_id, translated_text));
1175 }
1176
1177 for segment in chunk {
1178 if let Some((job_id, segment_id, translated_text)) =
1179 hash_to_hit.get(&segment.checksum)
1180 {
1181 let mut block_stmt = conn.prepare(
1182 "SELECT block_id, translated_text
1183 FROM translation_blocks
1184 WHERE job_id = ?1 AND segment_id = ?2
1185 ORDER BY block_id",
1186 )?;
1187 let block_rows = block_stmt.query_map(params![job_id, segment_id], |row| {
1188 Ok(BlockTranslation {
1189 block_id: BlockId(row.get::<_, String>(0)?),
1190 text: row.get(1)?,
1191 })
1192 })?;
1193 let blocks = block_rows.collect::<std::result::Result<Vec<_>, _>>()?;
1194
1195 let mut by_id = blocks
1196 .into_iter()
1197 .map(|block| (block.block_id.0.clone(), block))
1198 .collect::<HashMap<_, _>>();
1199
1200 let mut ordered = Vec::with_capacity(segment.block_ids.len());
1201 let mut valid = true;
1202 for id in &segment.block_ids {
1203 let Some(block) = by_id.remove(&id.0) else {
1204 valid = false;
1205 break;
1206 };
1207 ordered.push(block);
1208 }
1209 if !valid || !by_id.is_empty() {
1210 continue;
1211 }
1212
1213 results.insert(
1214 segment.id.0.clone(),
1215 CachedTranslation {
1216 translated_text: translated_text.clone(),
1217 blocks: ordered,
1218 },
1219 );
1220 }
1221 }
1222 }
1223
1224 Ok(results)
1225 }
1226
1227 fn migrate(&self) -> Result<()> {
1228 let conn = self.conn.borrow();
1229 if table_exists(&conn, "translations")?
1230 && !table_has_column(&conn, "translations", "job_id")?
1231 {
1232 let suffix = unix_timestamp_nanos();
1233 rename_table_if_exists(&conn, "qa_findings", suffix)?;
1234 rename_table_if_exists(&conn, "translation_blocks", suffix)?;
1235 rename_table_if_exists(&conn, "translations", suffix)?;
1236 rename_table_if_exists(&conn, "segments", suffix)?;
1237 rename_table_if_exists(&conn, "jobs", suffix)?;
1238 }
1239 conn.execute_batch(
1240 "
1241 PRAGMA foreign_keys = ON;
1242 CREATE TABLE IF NOT EXISTS _migrations (
1243 version INTEGER PRIMARY KEY,
1244 name TEXT NOT NULL,
1245 applied_at TEXT NOT NULL
1246 );
1247
1248 CREATE TABLE IF NOT EXISTS jobs (
1249 id TEXT PRIMARY KEY,
1250 input_path TEXT NOT NULL DEFAULT '',
1251 input_snapshot_path TEXT,
1252 input_sha256 TEXT,
1253 output_path TEXT NOT NULL DEFAULT '',
1254 input_hash TEXT NOT NULL,
1255 source_lang TEXT,
1256 target_lang TEXT NOT NULL,
1257 provider TEXT NOT NULL,
1258 model TEXT NOT NULL,
1259 base_url TEXT,
1260 api_key_env TEXT,
1261 status TEXT NOT NULL,
1262 config_json TEXT,
1263 events_path TEXT,
1264 report_json_path TEXT,
1265 report_markdown_path TEXT,
1266 created_at TEXT NOT NULL,
1267 updated_at TEXT NOT NULL
1268 );
1269
1270 CREATE TABLE IF NOT EXISTS segments (
1271 id TEXT NOT NULL,
1272 job_id TEXT NOT NULL,
1273 section_id TEXT NOT NULL,
1274 ordinal INTEGER NOT NULL,
1275 source_hash TEXT NOT NULL,
1276 prompt_version TEXT NOT NULL,
1277 provider TEXT NOT NULL,
1278 model TEXT NOT NULL,
1279 status TEXT NOT NULL,
1280 attempts INTEGER NOT NULL DEFAULT 0,
1281 input_tokens INTEGER,
1282 output_tokens INTEGER,
1283 tokens_input INTEGER,
1284 tokens_input_cached INTEGER,
1285 tokens_output INTEGER,
1286 tokens_estimated INTEGER NOT NULL DEFAULT 0,
1287 cost_estimate REAL,
1288 error TEXT,
1289 translated_hash TEXT,
1290 PRIMARY KEY (job_id, id),
1291 FOREIGN KEY(job_id) REFERENCES jobs(id)
1292 );
1293
1294 CREATE TABLE IF NOT EXISTS translations (
1295 segment_id TEXT NOT NULL,
1296 job_id TEXT NOT NULL,
1297 translated_text TEXT NOT NULL,
1298 provider TEXT NOT NULL,
1299 model TEXT NOT NULL,
1300 prompt_version TEXT NOT NULL,
1301 created_at TEXT NOT NULL,
1302 PRIMARY KEY (job_id, segment_id),
1303 FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
1304 );
1305
1306 CREATE TABLE IF NOT EXISTS translation_blocks (
1307 segment_id TEXT NOT NULL,
1308 job_id TEXT NOT NULL,
1309 block_id TEXT NOT NULL,
1310 translated_text TEXT NOT NULL,
1311 PRIMARY KEY (job_id, segment_id, block_id),
1312 FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
1313 );
1314
1315 CREATE TABLE IF NOT EXISTS qa_findings (
1316 id TEXT PRIMARY KEY,
1317 segment_id TEXT NOT NULL,
1318 job_id TEXT NOT NULL,
1319 severity TEXT NOT NULL,
1320 kind TEXT NOT NULL,
1321 message TEXT NOT NULL,
1322 FOREIGN KEY(job_id, segment_id) REFERENCES segments(job_id, id)
1323 );
1324
1325 CREATE TABLE IF NOT EXISTS segment_flags (
1326 id INTEGER PRIMARY KEY,
1327 job_id TEXT NOT NULL,
1328 segment_id TEXT NOT NULL,
1329 kind TEXT NOT NULL,
1330 note TEXT,
1331 suggested_source TEXT,
1332 suggested_target TEXT,
1333 ingested_at TEXT NOT NULL,
1334 consumed INTEGER NOT NULL DEFAULT 0,
1335 FOREIGN KEY(job_id) REFERENCES jobs(id) ON DELETE CASCADE
1336 );
1337 ",
1338 )?;
1339 ensure_column(&conn, "jobs", "input_path", "TEXT NOT NULL DEFAULT ''")?;
1340 ensure_column(&conn, "jobs", "input_snapshot_path", "TEXT")?;
1341 ensure_column(&conn, "jobs", "input_sha256", "TEXT")?;
1342 ensure_column(&conn, "jobs", "output_path", "TEXT NOT NULL DEFAULT ''")?;
1343 ensure_column(&conn, "jobs", "base_url", "TEXT")?;
1344 ensure_column(&conn, "jobs", "api_key_env", "TEXT")?;
1345 ensure_column(&conn, "jobs", "config_json", "TEXT")?;
1346 ensure_column(&conn, "jobs", "events_path", "TEXT")?;
1347 ensure_column(&conn, "jobs", "report_json_path", "TEXT")?;
1348 ensure_column(&conn, "jobs", "report_markdown_path", "TEXT")?;
1349 ensure_column(
1350 &conn,
1351 "segments",
1352 "cache_namespace",
1353 "TEXT NOT NULL DEFAULT ''",
1354 )?;
1355 ensure_column(&conn, "segments", "tokens_input", "INTEGER")?;
1356 ensure_column(&conn, "segments", "tokens_input_cached", "INTEGER")?;
1357 ensure_column(&conn, "segments", "tokens_output", "INTEGER")?;
1358 ensure_column(
1359 &conn,
1360 "segments",
1361 "tokens_estimated",
1362 "INTEGER NOT NULL DEFAULT 0",
1363 )?;
1364 conn.execute_batch(
1365 "CREATE INDEX IF NOT EXISTS idx_segments_cache_lookup
1366 ON segments(source_hash, cache_namespace, prompt_version, provider, model, status);
1367 CREATE INDEX IF NOT EXISTS idx_segment_flags_job
1368 ON segment_flags(job_id, consumed);",
1369 )?;
1370 record_migration(&conn, 1, "initial")?;
1371 record_migration(&conn, 2, "v1_0_1_input_snapshot")?;
1372 record_migration(&conn, 3, "v1_1_segment_flags")?;
1373 Ok(())
1374 }
1375
1376 fn touch_job(&self, job_id: &str, status: &str) -> Result<()> {
1377 let conn = self.conn.borrow();
1378 conn.execute(
1379 "UPDATE jobs SET status = ?1, updated_at = ?2 WHERE id = ?3",
1380 params![status, timestamp_string(), job_id],
1381 )?;
1382 Ok(())
1383 }
1384}
1385
1386fn table_exists(conn: &Connection, table: &str) -> rusqlite::Result<bool> {
1387 conn.query_row(
1388 "SELECT EXISTS(SELECT 1 FROM sqlite_master WHERE type = 'table' AND name = ?1)",
1389 params![table],
1390 |row| row.get::<_, bool>(0),
1391 )
1392}
1393
1394fn table_has_column(conn: &Connection, table: &str, column: &str) -> rusqlite::Result<bool> {
1395 let mut stmt = conn.prepare(&format!("PRAGMA table_info({table})"))?;
1396 let rows = stmt.query_map([], |row| row.get::<_, String>(1))?;
1397 for row in rows {
1398 if row? == column {
1399 return Ok(true);
1400 }
1401 }
1402 Ok(false)
1403}
1404
1405fn rename_table_if_exists(conn: &Connection, table: &str, suffix: u128) -> rusqlite::Result<()> {
1406 if table_exists(conn, table)? {
1407 conn.execute(
1408 &format!("ALTER TABLE {table} RENAME TO {table}_legacy_{suffix}"),
1409 [],
1410 )?;
1411 }
1412 Ok(())
1413}
1414
1415fn ensure_column(
1416 conn: &Connection,
1417 table: &str,
1418 column: &str,
1419 definition: &str,
1420) -> rusqlite::Result<()> {
1421 if !table_has_column(conn, table, column)? {
1422 conn.execute(
1423 &format!("ALTER TABLE {table} ADD COLUMN {column} {definition}"),
1424 [],
1425 )?;
1426 }
1427 Ok(())
1428}
1429
1430fn record_migration(conn: &Connection, version: i64, name: &str) -> rusqlite::Result<()> {
1431 conn.execute(
1432 "INSERT OR IGNORE INTO _migrations (version, name, applied_at)
1433 VALUES (?1, ?2, ?3)",
1434 params![version, name, timestamp_string()],
1435 )?;
1436 Ok(())
1437}
1438
1439fn replace_block_translations(
1440 conn: &Connection,
1441 job_id: &str,
1442 segment_id: &str,
1443 blocks: &[BlockTranslation],
1444) -> rusqlite::Result<()> {
1445 conn.execute(
1446 "DELETE FROM translation_blocks WHERE job_id = ?1 AND segment_id = ?2",
1447 params![job_id, segment_id],
1448 )?;
1449 for block in blocks {
1450 conn.execute(
1451 "INSERT INTO translation_blocks (segment_id, job_id, block_id, translated_text)
1452 VALUES (?1, ?2, ?3, ?4)",
1453 params![
1454 segment_id,
1455 job_id,
1456 block.block_id.0.as_str(),
1457 block.text.as_str()
1458 ],
1459 )?;
1460 }
1461 Ok(())
1462}
1463
1464#[derive(Debug, Clone, Copy)]
1465pub enum RetryScope {
1466 Failed,
1467 NeedsReview,
1468 All,
1469}
1470
1471fn file_hash(path: &Path) -> CoreResult<String> {
1472 let bytes = fs::read(path)?;
1473 Ok(stable_hash_bytes(&bytes))
1474}
1475
1476fn stable_hash(text: &str) -> String {
1477 stable_hash_bytes(text.as_bytes())
1478}
1479
1480fn stable_hash_bytes(bytes: &[u8]) -> String {
1481 let digest = Sha256::digest(bytes);
1482 let mut output = String::with_capacity(digest.len() * 2);
1483 for byte in digest {
1484 use std::fmt::Write as _;
1485 write!(&mut output, "{byte:02x}").expect("writing to string should not fail");
1486 }
1487 output
1488}
1489
1490fn unix_timestamp() -> u64 {
1491 SystemTime::now()
1492 .duration_since(UNIX_EPOCH)
1493 .unwrap_or_default()
1494 .as_secs()
1495}
1496
1497fn unix_timestamp_nanos() -> u128 {
1498 SystemTime::now()
1499 .duration_since(UNIX_EPOCH)
1500 .unwrap_or_default()
1501 .as_nanos()
1502}
1503
1504fn timestamp_string() -> String {
1505 unix_timestamp().to_string()
1506}
1507
1508#[cfg(test)]
1509mod tests {
1510 use super::*;
1511 use bookforge_core::{
1512 ir::{BlockId, SectionId},
1513 segment::{
1514 Segment, SegmentBlock, SegmentConstraints, SegmentContext, SegmentId, SegmentMetadata,
1515 SegmentSource, SegmentTextRun,
1516 },
1517 };
1518
1519 #[test]
1520 fn store_reuses_connection_across_job_operations() {
1521 let db_path = temp_path("jobs.sqlite");
1522 let input_path = temp_path("input.epub");
1523 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1524
1525 let store = JobStore::open(&db_path).expect("store should open");
1526 let job = store
1527 .create_job(CreateJob {
1528 input: &input_path,
1529 output: &temp_path("output.epub"),
1530 source_lang: Some("English"),
1531 target_lang: "Italian",
1532 provider: "mock",
1533 model: "mock-prefix",
1534 base_url: None,
1535 api_key_env: None,
1536 })
1537 .expect("job should be created");
1538 let segments = vec![segment("seg_a", 0), segment("seg_b", 1)];
1539 store
1540 .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
1541 .expect("segments should insert");
1542
1543 store
1544 .save_translation(SaveTranslation {
1545 job_id: &job.id,
1546 segment_id: "seg_a",
1547 translated_text: "Tradotto",
1548 blocks: &[BlockTranslation {
1549 block_id: BlockId("b_000000".to_string()),
1550 text: "Tradotto".to_string(),
1551 }],
1552 provider: "mock",
1553 model: "mock-prefix",
1554 prompt_version: "v1",
1555 input_tokens: Some(11),
1556 input_cached_tokens: Some(0),
1557 output_tokens: Some(7),
1558 tokens_estimated: false,
1559 })
1560 .expect("translation should save");
1561 store
1562 .mark_segment_failed(&job.id, "seg_b", "provider unavailable")
1563 .expect("segment should be marked failed");
1564
1565 let summary = store
1566 .summary(&job.id)
1567 .expect("summary should load")
1568 .expect("job should exist");
1569 assert_eq!(summary.total_segments, 2);
1570 assert_eq!(summary.succeeded, 1);
1571 assert_eq!(summary.failed, 1);
1572 assert_eq!(summary.input_tokens, 11);
1573 assert_eq!(summary.output_tokens, 7);
1574 let blocks = store
1575 .load_block_translations(&job.id)
1576 .expect("block translations should load");
1577 assert_eq!(blocks.len(), 1);
1578 assert_eq!(blocks[0].text, "Tradotto");
1579
1580 let _ = fs::remove_file(db_path);
1581 let _ = fs::remove_file(input_path);
1582 }
1583
1584 fn segment(id: &str, ordinal: usize) -> Segment {
1585 let block_id = BlockId(format!("b_{ordinal:06}"));
1586 Segment {
1587 id: SegmentId(id.to_string()),
1588 section_id: SectionId("sec_000000".to_string()),
1589 ordinal,
1590 block_ids: vec![block_id.clone()],
1591 source: SegmentSource {
1592 text: format!("Source {ordinal}"),
1593 blocks: vec![SegmentBlock {
1594 block_id,
1595 kind: "paragraph".to_string(),
1596 text: format!("Source {ordinal}"),
1597 text_runs: vec![SegmentTextRun {
1598 id: format!("r{ordinal}"),
1599 text: format!("Source {ordinal}"),
1600 }],
1601 protected_spans: Vec::new(),
1602 }],
1603 token_estimate: 2,
1604 },
1605 context: SegmentContext::default(),
1606 metadata: SegmentMetadata::default(),
1607 constraints: SegmentConstraints::default(),
1608 checksum: format!("checksum_{ordinal}"),
1609 }
1610 }
1611
1612 fn temp_path(name: &str) -> PathBuf {
1613 std::env::temp_dir().join(format!(
1614 "bookforge-store-test-{}-{}-{name}",
1615 std::process::id(),
1616 unix_timestamp_nanos()
1617 ))
1618 }
1619
1620 fn build_seeded_store_with_translation(
1621 db_path: &PathBuf,
1622 cache_namespace: &str,
1623 block_ids: &[&str],
1624 ) -> (JobStore, JobRecord, Segment) {
1625 let input_path = temp_path("input.epub");
1626 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1627
1628 let store = JobStore::open(db_path).expect("store should open");
1629 let job = store
1630 .create_job(CreateJob {
1631 input: &input_path,
1632 output: &temp_path("output.epub"),
1633 source_lang: Some("English"),
1634 target_lang: "Italian",
1635 provider: "mock",
1636 model: "mock-prefix",
1637 base_url: None,
1638 api_key_env: None,
1639 })
1640 .expect("job should be created");
1641
1642 let mut seg = segment("seg_a", 0);
1643 let blocks: Vec<BlockTranslation> = block_ids
1644 .iter()
1645 .map(|id| BlockTranslation {
1646 block_id: BlockId(id.to_string()),
1647 text: format!("Tradotto {id}"),
1648 })
1649 .collect();
1650 seg.block_ids = block_ids.iter().map(|id| BlockId(id.to_string())).collect();
1651
1652 store
1653 .insert_segments(
1654 &job.id,
1655 &[seg.clone()],
1656 "v1",
1657 "mock",
1658 "mock-prefix",
1659 cache_namespace,
1660 )
1661 .expect("segments should insert");
1662 store
1663 .save_translation(SaveTranslation {
1664 job_id: &job.id,
1665 segment_id: "seg_a",
1666 translated_text: "Tradotto",
1667 blocks: &blocks,
1668 provider: "mock",
1669 model: "mock-prefix",
1670 prompt_version: "v1",
1671 input_tokens: Some(11),
1672 input_cached_tokens: Some(0),
1673 output_tokens: Some(7),
1674 tokens_estimated: false,
1675 })
1676 .expect("translation should save");
1677
1678 let _ = fs::remove_file(input_path);
1679 (store, job, seg)
1680 }
1681
1682 #[test]
1683 fn job_config_snapshot_round_trips_through_store() {
1684 let db_path = temp_path("snapshot.sqlite");
1685 let input_path = temp_path("input.epub");
1686 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1687 let store = JobStore::open(&db_path).expect("store should open");
1688 let job = store
1689 .create_job(CreateJob {
1690 input: &input_path,
1691 output: &temp_path("output.epub"),
1692 source_lang: Some("English"),
1693 target_lang: "Italian",
1694 provider: "openrouter",
1695 model: "model",
1696 base_url: Some("https://example.test/v1"),
1697 api_key_env: Some("OPENROUTER_API_KEY"),
1698 })
1699 .expect("job should be created");
1700 let settings = bookforge_core::TranslationProfile::Balanced.resolve();
1701 let snapshot = RunConfigSnapshot {
1702 input_path: input_path.clone(),
1703 input_snapshot_path: Some(temp_path("input-snapshot.epub")),
1704 input_sha256: Some("abc123".to_string()),
1705 output_path: temp_path("translated.epub"),
1706 events_path: Some(temp_path("events.jsonl")),
1707 report_json_path: Some(temp_path("report.json")),
1708 report_markdown_path: Some(temp_path("report.md")),
1709 source_language: Some("English".to_string()),
1710 target_language: "Italian".to_string(),
1711 provider: "openrouter".to_string(),
1712 model: "model".to_string(),
1713 base_url: Some("https://example.test/v1".to_string()),
1714 api_key_env: Some("OPENROUTER_API_KEY".to_string()),
1715 profile: settings.profile,
1716 provider_preset: None,
1717 prompt_version: "batch_v1".to_string(),
1718 cache_namespace: "cache_ns".to_string(),
1719 settings: bookforge_core::ResolvedRunSettingsSnapshot::from_settings(&settings),
1720 };
1721
1722 store
1723 .update_job_config_snapshot(&job.id, &snapshot)
1724 .expect("snapshot should persist");
1725 let loaded = store
1726 .load_job_config_snapshot(&job.id)
1727 .expect("snapshot should load")
1728 .expect("snapshot should exist");
1729 assert_eq!(loaded, snapshot);
1730
1731 let reloaded_job = store
1732 .get_job(&job.id)
1733 .expect("job should load")
1734 .expect("job should exist");
1735 assert_eq!(reloaded_job.events_path, snapshot.events_path);
1736 assert_eq!(reloaded_job.report_json_path, snapshot.report_json_path);
1737 assert_eq!(
1738 reloaded_job.report_markdown_path,
1739 snapshot.report_markdown_path
1740 );
1741
1742 let _ = fs::remove_file(db_path);
1743 let _ = fs::remove_file(input_path);
1744 }
1745
1746 #[test]
1747 fn job_config_snapshot_does_not_store_api_key_value() {
1748 let db_path = temp_path("snapshot_secret.sqlite");
1749 let input_path = temp_path("input.epub");
1750 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1751 let api_key_env = "BOOKFORGE_TEST_API_KEY_VALUE_NOT_STORED";
1752 let api_key_value = "sk-live-secret-that-must-not-be-persisted";
1753 unsafe {
1756 std::env::set_var(api_key_env, api_key_value);
1757 }
1758
1759 let store = JobStore::open(&db_path).expect("store should open");
1760 let job = store
1761 .create_job(CreateJob {
1762 input: &input_path,
1763 output: &temp_path("output.epub"),
1764 source_lang: Some("English"),
1765 target_lang: "Italian",
1766 provider: "openrouter",
1767 model: "model",
1768 base_url: Some("https://example.test/v1"),
1769 api_key_env: Some(api_key_env),
1770 })
1771 .expect("job should be created");
1772 let settings = bookforge_core::TranslationProfile::Balanced.resolve();
1773 let snapshot = RunConfigSnapshot {
1774 input_path: input_path.clone(),
1775 input_snapshot_path: None,
1776 input_sha256: None,
1777 output_path: temp_path("translated.epub"),
1778 events_path: Some(temp_path("events.jsonl")),
1779 report_json_path: Some(temp_path("report.json")),
1780 report_markdown_path: Some(temp_path("report.md")),
1781 source_language: Some("English".to_string()),
1782 target_language: "Italian".to_string(),
1783 provider: "openrouter".to_string(),
1784 model: "model".to_string(),
1785 base_url: Some("https://example.test/v1".to_string()),
1786 api_key_env: Some(api_key_env.to_string()),
1787 profile: settings.profile,
1788 provider_preset: None,
1789 prompt_version: "batch_v1".to_string(),
1790 cache_namespace: "cache_ns".to_string(),
1791 settings: bookforge_core::ResolvedRunSettingsSnapshot::from_settings(&settings),
1792 };
1793
1794 store
1795 .update_job_config_snapshot(&job.id, &snapshot)
1796 .expect("snapshot should persist");
1797 let raw_json = {
1798 let conn = store.conn.borrow();
1799 conn.query_row(
1800 "SELECT config_json FROM jobs WHERE id = ?1",
1801 params![job.id],
1802 |row| row.get::<_, String>(0),
1803 )
1804 .expect("raw snapshot JSON should load")
1805 };
1806
1807 assert!(raw_json.contains(api_key_env));
1808 assert!(!raw_json.contains(api_key_value));
1809
1810 unsafe {
1811 std::env::remove_var(api_key_env);
1812 }
1813 let _ = fs::remove_file(db_path);
1814 let _ = fs::remove_file(input_path);
1815 }
1816
1817 #[test]
1818 fn terminal_loading_and_resumable_ids_preserve_lifecycle_boundaries() {
1819 let db_path = temp_path("terminal_resume.sqlite");
1820 let input_path = temp_path("input.epub");
1821 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1822 let store = JobStore::open(&db_path).expect("store should open");
1823 let job = store
1824 .create_job(CreateJob {
1825 input: &input_path,
1826 output: &temp_path("output.epub"),
1827 source_lang: Some("English"),
1828 target_lang: "Italian",
1829 provider: "mock",
1830 model: "mock-prefix",
1831 base_url: None,
1832 api_key_env: None,
1833 })
1834 .expect("job should be created");
1835 let segments = vec![
1836 segment("seg_done", 0),
1837 segment("seg_cached", 1),
1838 segment("seg_review", 2),
1839 segment("seg_failed", 3),
1840 segment("seg_queued", 4),
1841 ];
1842 store
1843 .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "ns")
1844 .expect("segments should insert");
1845 store
1846 .save_translation(SaveTranslation {
1847 job_id: &job.id,
1848 segment_id: "seg_done",
1849 translated_text: "Done",
1850 blocks: &[BlockTranslation {
1851 block_id: BlockId("b_000000".to_string()),
1852 text: "Done".to_string(),
1853 }],
1854 provider: "mock",
1855 model: "mock-prefix",
1856 prompt_version: "v1",
1857 input_tokens: None,
1858 input_cached_tokens: None,
1859 output_tokens: None,
1860 tokens_estimated: false,
1861 })
1862 .expect("done should save");
1863 store
1864 .save_cached_translation(SaveCachedTranslation {
1865 job_id: &job.id,
1866 segment_id: "seg_cached",
1867 translated_text: "Cached",
1868 blocks: &[BlockTranslation {
1869 block_id: BlockId("b_000001".to_string()),
1870 text: "Cached".to_string(),
1871 }],
1872 provider: "mock",
1873 model: "mock-prefix",
1874 prompt_version: "v1",
1875 })
1876 .expect("cached should save");
1877 store
1878 .save_needs_review(SaveNeedsReview {
1879 job_id: &job.id,
1880 segment_id: "seg_review",
1881 preserved_text: "Review",
1882 blocks: &[BlockTranslation {
1883 block_id: BlockId("b_000002".to_string()),
1884 text: "Review".to_string(),
1885 }],
1886 provider: "mock",
1887 model: "mock-prefix",
1888 prompt_version: "v1",
1889 error: "needs eyes",
1890 input_tokens: None,
1891 input_cached_tokens: None,
1892 output_tokens: None,
1893 tokens_estimated: false,
1894 })
1895 .expect("review should save");
1896 store
1897 .mark_segment_failed(&job.id, "seg_failed", "failed")
1898 .expect("failed should mark");
1899
1900 let terminal = store
1901 .load_terminal_segment_translations(&job.id)
1902 .expect("terminal records should load");
1903 let ids = terminal
1904 .iter()
1905 .map(|record| record.segment_id.as_str())
1906 .collect::<Vec<_>>();
1907 assert_eq!(ids, vec!["seg_done", "seg_cached", "seg_review"]);
1908 assert_eq!(terminal[0].blocks[0].block_id.0, "b_000000");
1909 assert_eq!(terminal[2].status, "needs_review");
1910
1911 let resumable = store
1912 .resumable_segment_ids(&job.id)
1913 .expect("resumable ids should load");
1914 assert_eq!(resumable, vec!["seg_failed", "seg_queued"]);
1915
1916 let _ = fs::remove_file(db_path);
1917 let _ = fs::remove_file(input_path);
1918 }
1919
1920 #[test]
1921 fn mark_unfinished_segments_failed_preserves_terminal_segments() {
1922 let db_path = temp_path("unfinished_preserve.sqlite");
1923 let input_path = temp_path("input.epub");
1924 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
1925 let store = JobStore::open(&db_path).expect("store should open");
1926 let job = store
1927 .create_job(CreateJob {
1928 input: &input_path,
1929 output: &temp_path("output.epub"),
1930 source_lang: Some("English"),
1931 target_lang: "Italian",
1932 provider: "mock",
1933 model: "mock-prefix",
1934 base_url: None,
1935 api_key_env: None,
1936 })
1937 .expect("job should be created");
1938 let segments = vec![
1939 segment("seg_succeeded", 0),
1940 segment("seg_cached", 1),
1941 segment("seg_review", 2),
1942 segment("seg_queued", 3),
1943 segment("seg_retry", 4),
1944 ];
1945 store
1946 .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
1947 .expect("segments should insert");
1948 store
1949 .save_translation(SaveTranslation {
1950 job_id: &job.id,
1951 segment_id: "seg_succeeded",
1952 translated_text: "Done",
1953 blocks: &[BlockTranslation {
1954 block_id: BlockId("b_000000".to_string()),
1955 text: "Done".to_string(),
1956 }],
1957 provider: "mock",
1958 model: "mock-prefix",
1959 prompt_version: "v1",
1960 input_tokens: None,
1961 input_cached_tokens: None,
1962 output_tokens: None,
1963 tokens_estimated: false,
1964 })
1965 .expect("succeeded segment should save");
1966 store
1967 .save_cached_translation(SaveCachedTranslation {
1968 job_id: &job.id,
1969 segment_id: "seg_cached",
1970 translated_text: "Cached",
1971 blocks: &[BlockTranslation {
1972 block_id: BlockId("b_000001".to_string()),
1973 text: "Cached".to_string(),
1974 }],
1975 provider: "mock",
1976 model: "mock-prefix",
1977 prompt_version: "v1",
1978 })
1979 .expect("cached segment should save");
1980 store
1981 .save_needs_review(SaveNeedsReview {
1982 job_id: &job.id,
1983 segment_id: "seg_review",
1984 preserved_text: "Needs review",
1985 blocks: &[BlockTranslation {
1986 block_id: BlockId("b_000002".to_string()),
1987 text: "Needs review".to_string(),
1988 }],
1989 provider: "mock",
1990 model: "mock-prefix",
1991 prompt_version: "v1",
1992 error: "qa issue",
1993 input_tokens: None,
1994 input_cached_tokens: None,
1995 output_tokens: None,
1996 tokens_estimated: false,
1997 })
1998 .expect("needs-review segment should save");
1999 store
2000 .retry_segments(&job.id, RetryScope::Failed)
2001 .expect("retry with no failed segments should be harmless");
2002 {
2003 let conn = store.conn.borrow();
2004 conn.execute(
2005 "UPDATE segments SET status = 'retry_pending' WHERE job_id = ?1 AND id = 'seg_retry'",
2006 params![job.id],
2007 )
2008 .expect("test status update should work");
2009 }
2010
2011 let candidate_ids = segments
2012 .iter()
2013 .map(|segment| segment.id.0.clone())
2014 .collect::<Vec<_>>();
2015 let changed = store
2016 .mark_unfinished_segments_failed(&job.id, &candidate_ids, "run failed")
2017 .expect("unfinished segments should be marked failed");
2018 assert_eq!(changed, 2);
2019
2020 let records = store.segment_records(&job.id).expect("records should load");
2021 let statuses = records
2022 .into_iter()
2023 .map(|record| (record.id, record.status))
2024 .collect::<HashMap<_, _>>();
2025 assert_eq!(statuses["seg_succeeded"], "succeeded");
2026 assert_eq!(statuses["seg_cached"], "skipped_cached");
2027 assert_eq!(statuses["seg_review"], "needs_review");
2028 assert_eq!(statuses["seg_queued"], "failed");
2029 assert_eq!(statuses["seg_retry"], "failed");
2030
2031 let _ = fs::remove_file(db_path);
2032 let _ = fs::remove_file(input_path);
2033 }
2034
2035 #[test]
2036 fn mark_unfinished_segments_failed_marks_only_resumable_segments() {
2037 let db_path = temp_path("unfinished_resumable_only.sqlite");
2038 let input_path = temp_path("input.epub");
2039 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2040 let store = JobStore::open(&db_path).expect("store should open");
2041 let job = store
2042 .create_job(CreateJob {
2043 input: &input_path,
2044 output: &temp_path("output.epub"),
2045 source_lang: Some("English"),
2046 target_lang: "Italian",
2047 provider: "mock",
2048 model: "mock-prefix",
2049 base_url: None,
2050 api_key_env: None,
2051 })
2052 .expect("job should be created");
2053 let segments = vec![
2054 segment("seg_succeeded", 0),
2055 segment("seg_cached", 1),
2056 segment("seg_review", 2),
2057 segment("seg_failed", 3),
2058 segment("seg_retry", 4),
2059 segment("seg_queued", 5),
2060 ];
2061 store
2062 .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2063 .expect("segments should insert");
2064 {
2065 let conn = store.conn.borrow();
2066 for (id, status) in [
2067 ("seg_succeeded", "succeeded"),
2068 ("seg_cached", "skipped_cached"),
2069 ("seg_review", "needs_review"),
2070 ("seg_failed", "failed"),
2071 ("seg_retry", "retry_pending"),
2072 ("seg_queued", "queued"),
2073 ] {
2074 conn.execute(
2075 "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
2076 params![status, job.id, id],
2077 )
2078 .expect("status should update");
2079 }
2080 }
2081
2082 let candidate_ids = segments
2083 .iter()
2084 .map(|segment| segment.id.0.clone())
2085 .collect::<Vec<_>>();
2086 let changed = store
2087 .mark_unfinished_segments_failed(&job.id, &candidate_ids, "run failed")
2088 .expect("unfinished segments should be marked failed");
2089
2090 assert_eq!(changed, 3);
2091 let records = store.segment_records(&job.id).expect("records should load");
2092 let statuses = records
2093 .into_iter()
2094 .map(|record| (record.id, record.status))
2095 .collect::<HashMap<_, _>>();
2096 assert_eq!(statuses["seg_succeeded"], "succeeded");
2097 assert_eq!(statuses["seg_cached"], "skipped_cached");
2098 assert_eq!(statuses["seg_review"], "needs_review");
2099 assert_eq!(statuses["seg_failed"], "failed");
2100 assert_eq!(statuses["seg_retry"], "failed");
2101 assert_eq!(statuses["seg_queued"], "failed");
2102
2103 let _ = fs::remove_file(db_path);
2104 let _ = fs::remove_file(input_path);
2105 }
2106
2107 #[test]
2108 fn resumable_segment_ids_excludes_succeeded_cached_and_needs_review() {
2109 let db_path = temp_path("resumable_excludes.sqlite");
2110 let input_path = temp_path("input.epub");
2111 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2112 let store = JobStore::open(&db_path).expect("store should open");
2113 let job = store
2114 .create_job(CreateJob {
2115 input: &input_path,
2116 output: &temp_path("output.epub"),
2117 source_lang: Some("English"),
2118 target_lang: "Italian",
2119 provider: "mock",
2120 model: "mock-prefix",
2121 base_url: None,
2122 api_key_env: None,
2123 })
2124 .expect("job should be created");
2125 let segments = vec![
2126 segment("seg_succeeded", 0),
2127 segment("seg_cached", 1),
2128 segment("seg_review", 2),
2129 ];
2130 store
2131 .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2132 .expect("segments should insert");
2133 {
2134 let conn = store.conn.borrow();
2135 for (id, status) in [
2136 ("seg_succeeded", "succeeded"),
2137 ("seg_cached", "skipped_cached"),
2138 ("seg_review", "needs_review"),
2139 ] {
2140 conn.execute(
2141 "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
2142 params![status, job.id, id],
2143 )
2144 .expect("status should update");
2145 }
2146 }
2147
2148 let ids = store
2149 .resumable_segment_ids(&job.id)
2150 .expect("resumable ids should load");
2151
2152 assert!(ids.is_empty());
2153 let _ = fs::remove_file(db_path);
2154 let _ = fs::remove_file(input_path);
2155 }
2156
2157 #[test]
2158 fn resumable_segment_ids_includes_failed_retry_pending_and_pending() {
2159 let db_path = temp_path("resumable_includes.sqlite");
2160 let input_path = temp_path("input.epub");
2161 fs::write(&input_path, b"epub bytes").expect("input fixture should be writable");
2162 let store = JobStore::open(&db_path).expect("store should open");
2163 let job = store
2164 .create_job(CreateJob {
2165 input: &input_path,
2166 output: &temp_path("output.epub"),
2167 source_lang: Some("English"),
2168 target_lang: "Italian",
2169 provider: "mock",
2170 model: "mock-prefix",
2171 base_url: None,
2172 api_key_env: None,
2173 })
2174 .expect("job should be created");
2175 let segments = vec![
2176 segment("seg_failed", 0),
2177 segment("seg_retry", 1),
2178 segment("seg_queued", 2),
2179 ];
2180 store
2181 .insert_segments(&job.id, &segments, "v1", "mock", "mock-prefix", "test_ns")
2182 .expect("segments should insert");
2183 {
2184 let conn = store.conn.borrow();
2185 for (id, status) in [
2186 ("seg_failed", "failed"),
2187 ("seg_retry", "retry_pending"),
2188 ("seg_queued", "queued"),
2189 ] {
2190 conn.execute(
2191 "UPDATE segments SET status = ?1 WHERE job_id = ?2 AND id = ?3",
2192 params![status, job.id, id],
2193 )
2194 .expect("status should update");
2195 }
2196 }
2197
2198 let ids = store
2199 .resumable_segment_ids(&job.id)
2200 .expect("resumable ids should load");
2201
2202 assert_eq!(ids, vec!["seg_failed", "seg_retry", "seg_queued"]);
2203 let _ = fs::remove_file(db_path);
2204 let _ = fs::remove_file(input_path);
2205 }
2206
2207 #[test]
2208 fn cached_translation_requires_matching_cache_namespace() {
2209 let db_path = temp_path("ns_match.sqlite");
2210 let (store, _job, seg) =
2211 build_seeded_store_with_translation(&db_path, "ns_one", &["b_000000"]);
2212
2213 let hit = store
2214 .find_cached_translation(
2215 &seg,
2216 "v1",
2217 "mock",
2218 "mock-prefix",
2219 Some("English"),
2220 "Italian",
2221 "ns_one",
2222 )
2223 .expect("query ok");
2224 assert!(hit.is_some(), "matching namespace should hit");
2225
2226 let miss = store
2227 .find_cached_translation(
2228 &seg,
2229 "v1",
2230 "mock",
2231 "mock-prefix",
2232 Some("English"),
2233 "Italian",
2234 "ns_two",
2235 )
2236 .expect("query ok");
2237 assert!(miss.is_none(), "different namespace must not hit");
2238
2239 let _ = fs::remove_file(db_path);
2240 }
2241
2242 #[test]
2243 fn cached_translation_rejects_mismatched_block_ids() {
2244 let db_path = temp_path("blockid_match.sqlite");
2245 let (store, _job, mut seg) =
2246 build_seeded_store_with_translation(&db_path, "ns_x", &["b_000000"]);
2247
2248 seg.block_ids = vec![BlockId("b_999999".to_string())];
2250
2251 let miss = store
2252 .find_cached_translation(
2253 &seg,
2254 "v1",
2255 "mock",
2256 "mock-prefix",
2257 Some("English"),
2258 "Italian",
2259 "ns_x",
2260 )
2261 .expect("query ok");
2262 assert!(
2263 miss.is_none(),
2264 "mismatched block_ids must reject the cached row"
2265 );
2266
2267 let _ = fs::remove_file(db_path);
2268 }
2269
2270 #[test]
2271 fn old_empty_cache_namespace_rows_do_not_match_new_runs() {
2272 let db_path = temp_path("legacy_ns.sqlite");
2273 let (store, _job, seg) = build_seeded_store_with_translation(&db_path, "", &["b_000000"]);
2275
2276 let miss = store
2277 .find_cached_translation(
2278 &seg,
2279 "v1",
2280 "mock",
2281 "mock-prefix",
2282 Some("English"),
2283 "Italian",
2284 "real_ns",
2285 )
2286 .expect("query ok");
2287 assert!(
2288 miss.is_none(),
2289 "legacy empty-namespace row must not satisfy a real namespace lookup"
2290 );
2291
2292 let _ = fs::remove_file(db_path);
2293 }
2294
2295 #[test]
2296 fn job_store_enables_wal_and_busy_timeout() {
2297 let db_path = temp_path("wal_busy.sqlite");
2298 let store = JobStore::open(&db_path).expect("store should open");
2299
2300 let conn = store.conn.borrow();
2301 let journal_mode: String = conn
2302 .pragma_query_value(None, "journal_mode", |row| row.get(0))
2303 .expect("pragma journal_mode should succeed");
2304 assert_eq!(
2305 journal_mode.to_lowercase(),
2306 "wal",
2307 "WAL journal mode must be enabled"
2308 );
2309
2310 let busy_timeout: i64 = conn
2311 .pragma_query_value(None, "busy_timeout", |row| row.get(0))
2312 .expect("pragma busy_timeout should succeed");
2313 assert!(
2314 busy_timeout >= 5000,
2315 "busy_timeout should be at least 5000ms, got {busy_timeout}"
2316 );
2317
2318 let wal_path = db_path.with_extension("sqlite-wal");
2319 let shm_path = db_path.with_extension("sqlite-shm");
2320 let _ = fs::remove_file(db_path);
2324 let _ = fs::remove_file(wal_path);
2325 let _ = fs::remove_file(shm_path);
2326 }
2327
2328 #[test]
2329 fn job_store_enables_foreign_keys_on_every_connection() {
2330 let db_path = temp_path("fk.sqlite");
2331 let store = JobStore::open(&db_path).expect("store should open");
2332
2333 let conn = store.conn.borrow();
2334 let fk_enabled: i64 = conn
2335 .pragma_query_value(None, "foreign_keys", |row| row.get(0))
2336 .expect("pragma foreign_keys should succeed");
2337 assert_eq!(
2338 fk_enabled, 1,
2339 "foreign_keys pragma must be ON on every connection"
2340 );
2341
2342 let _ = fs::remove_file(db_path);
2343 }
2344
2345 #[test]
2346 fn doctor_reports_wal_sidecars_as_normal_when_integrity_check_passes() {
2347 let db_path = temp_path("doctor.sqlite");
2348 let store = JobStore::open(&db_path).expect("store should open");
2349
2350 let input_path = temp_path("input_doctor.epub");
2352 fs::write(&input_path, b"epub bytes").expect("test epub");
2353 let _job = store
2354 .create_job(CreateJob {
2355 input: &input_path,
2356 output: &temp_path("out_doctor.epub"),
2357 source_lang: Some("English"),
2358 target_lang: "Italian",
2359 provider: "mock",
2360 model: "mock-prefix",
2361 base_url: None,
2362 api_key_env: None,
2363 })
2364 .expect("job created");
2365 drop(store);
2366
2367 let doctor = run_doctor(Some(db_path.clone())).expect("doctor should run");
2368 assert!(doctor.database_exists, "database should exist");
2369 assert_eq!(
2370 doctor.journal_mode.to_lowercase(),
2371 "wal",
2372 "journal mode should be wal"
2373 );
2374 assert_eq!(doctor.integrity_check, "ok", "integrity check should pass");
2375 assert!(
2376 doctor.wal_sidecars_normal,
2377 "wal sidecars should be reported as normal"
2378 );
2379
2380 if doctor.wal_present || doctor.shm_present {
2381 assert!(
2382 !doctor.note.is_empty(),
2383 "doctor must explain WAL sidecars when they are present"
2384 );
2385 }
2386
2387 let wal_path = db_path.with_extension("sqlite-wal");
2388 let shm_path = db_path.with_extension("sqlite-shm");
2389 let _ = fs::remove_file(&db_path);
2390 let _ = fs::remove_file(input_path);
2391 let _ = fs::remove_file(wal_path);
2392 let _ = fs::remove_file(shm_path);
2393 }
2394
2395 #[test]
2396 fn checkpoint_writer_and_reader_do_not_immediately_busy_fail() {
2397 let db_path = temp_path("concurrent.sqlite");
2398 let input_path = temp_path("input_conc.epub");
2399 fs::write(&input_path, b"epub bytes").expect("test epub");
2400
2401 let store_w = JobStore::open(&db_path).expect("store_w open");
2403 let job = store_w
2404 .create_job(CreateJob {
2405 input: &input_path,
2406 output: &temp_path("out_conc.epub"),
2407 source_lang: Some("English"),
2408 target_lang: "Italian",
2409 provider: "mock",
2410 model: "mock-prefix",
2411 base_url: None,
2412 api_key_env: None,
2413 })
2414 .expect("job created");
2415 store_w
2416 .insert_segments(
2417 &job.id,
2418 &[segment("seg_conc", 0)],
2419 "v1",
2420 "mock",
2421 "mock-prefix",
2422 "ns",
2423 )
2424 .expect("segments inserted");
2425
2426 let store_r = JobStore::open(&db_path).expect("store_r open");
2428 let summary = store_r
2429 .summary(&job.id)
2430 .expect("summary should load")
2431 .expect("job should exist");
2432 assert_eq!(summary.total_segments, 1);
2433
2434 let wal_path = db_path.with_extension("sqlite-wal");
2435 let shm_path = db_path.with_extension("sqlite-shm");
2436 let _ = fs::remove_file(&db_path);
2437 let _ = fs::remove_file(input_path);
2438 let _ = fs::remove_file(wal_path);
2439 let _ = fs::remove_file(shm_path);
2440 }
2441}