batch_mode_tts/
run.rs

1crate::ix!();
2
3impl BatchModeTtsJob {
4
5    /// Execute the job with automatic chunking & ffmpeg concat.
6    ///
7    /// This operation is **idempotent** and _resume‑safe_:
8    /// * Already‑present, non‑empty `part_####.<ext>` files in the working
9    ///   directory are detected and **skipped**, allowing the job to continue
10    ///   from the first missing chunk after any interruption.
11    /// * Empty or corrupt part files are automatically regenerated.
12    #[tracing::instrument(
13        level = "info",
14        skip_all,
15        fields(
16            input  = %self.input_path().display(),
17            output = %self.output_path().display(),
18            chunk  = self.chunk_chars()
19        )
20    )]
21    pub async fn run(&self) -> Result<(), BatchModeTtsError> {
22        use std::io;
23
24        info!("Starting (resumable) chunked batch‑mode TTS job");
25
26        // -------------------------------------------------------------------
27        // 1) Load full input
28        // -------------------------------------------------------------------
29        let text = tokio::fs::read_to_string(&self.input_path()).await?;
30        debug!("Loaded {} UTF‑8 bytes", text.len());
31
32        // -------------------------------------------------------------------
33        // 2) Prep workspace + extension
34        // -------------------------------------------------------------------
35        let work_dir = self
36            .work_dir()
37            .clone()
38            .unwrap_or_else(|| self.output_path().with_extension("parts"));
39        tokio::fs::create_dir_all(&work_dir).await.ok();
40        debug!("Using work dir {:?}", work_dir);
41
42        let ext = Self::ext_for_format(*self.response_format());
43
44        // -------------------------------------------------------------------
45        // 3) Discover high‑water mark (HWM) of already generated parts
46        //    We will *not* regenerate anything ≤ HWM.
47        // -------------------------------------------------------------------
48        let hwm_opt = Self::find_high_water_mark(&work_dir, ext)?;
49        let hwm = hwm_opt.unwrap_or_else(|| usize::MAX.wrapping_sub(usize::MAX)); // 0 saturating trick
50        let anchored_count = if hwm_opt.is_some() { hwm + 1 } else { 0 };
51        if anchored_count > 0 {
52            info!("Found {anchored_count} existing contiguous part(s); high‑water mark is #{hwm:04}");
53        } else {
54            info!("No existing parts found; starting from scratch");
55        }
56
57        // -------------------------------------------------------------------
58        // 4) Reproduce *legacy* (byte‑biased) boundaries up to HWM,
59        //    then re‑chunk the *remainder* with the corrected char‑safe splitter.
60        // -------------------------------------------------------------------
61        let legacy_chunks = Self::old_chunk_text(&text, *self.chunk_chars());
62        info!(
63            "Legacy splitter would produce {} chunk(s) at size {}",
64            legacy_chunks.len(),
65            self.chunk_chars()
66        );
67
68        if anchored_count > legacy_chunks.len() {
69            warn!(
70                "Existing parts exceed legacy recomputed chunk count ({} > {}) — \
71                 seams likely changed since last run; clamping anchor to {}",
72                anchored_count,
73                legacy_chunks.len(),
74                legacy_chunks.len().saturating_sub(1)
75            );
76        }
77        let anchored_count = anchored_count.min(legacy_chunks.len());
78        let remainder_text = if anchored_count == 0 {
79            // Use normalized corpus produced by the legacy splitter for consistency
80            legacy_chunks.join("")
81        } else {
82            legacy_chunks[anchored_count..].join("")
83        };
84        debug!(
85            "Prepared remainder text after anchor: {} chars",
86            remainder_text.chars().count()
87        );
88
89        // New corrected splitter: strictly ≤ min(self.chunk_chars, 4096) characters
90        let max_chars = self.chunk_chars().min(&4096);
91        let new_chunks = Self::chunk_text(&remainder_text, *max_chars);
92        info!(
93            "Char‑safe splitter produced {} downstream chunk(s) at size ≤ {} chars",
94            new_chunks.len(),
95            max_chars
96        );
97
98        // Total expected parts = anchored (frozen) + newly planned
99        let expected_total = anchored_count + new_chunks.len();
100
101        // -------------------------------------------------------------------
102        // 5) Set up API client
103        // -------------------------------------------------------------------
104        let client = OpenAIClient::with_config(
105            OpenAIConfig::new().with_api_key(
106                std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env‑var missing"),
107            ),
108        );
109        let audio_api = async_openai::Audio::new(&client);
110
111        // We will accumulate all final part paths (frozen + newly generated)
112        let mut part_paths = Vec::<PathBuf>::with_capacity(expected_total);
113
114        // 5a) Register frozen parts [0 .. anchored_count)
115        for i in 0..anchored_count {
116            let p = work_dir.join(format!("part_{i:04}.{ext}"));
117            match tokio::fs::metadata(&p).await {
118                Ok(meta) if meta.len() > 0 => {
119                    info!("Skipping existing part #{i} ({:?})", p);
120                    part_paths.push(p);
121                }
122                Ok(_) => {
123                    warn!(
124                        "Existing part #{i} is empty/corrupt — will be regenerated using *new* chunking"
125                    );
126                    // Fallthrough: allow regeneration below by *injecting* this index into downstream flow.
127                }
128                Err(e) if e.kind() == io::ErrorKind::NotFound => {
129                    warn!(
130                        "Expected frozen part #{i} not found — will be regenerated using *new* chunking"
131                    );
132                    // Fallthrough to regeneration path below.
133                }
134                Err(e) => return Err(BatchModeTtsError::IoError(e)),
135            }
136        }
137
138        // 5b) Generate/skip downstream parts [anchored_count .. expected_total)
139        for (j, segment) in new_chunks.iter().enumerate() {
140            let idx = anchored_count + j;
141            let part_path = work_dir.join(format!("part_{idx:04}.{ext}"));
142
143            // If the part already exists and is non‑empty, skip regeneration.
144            match tokio::fs::metadata(&part_path).await {
145                Ok(meta) if meta.len() > 0 => {
146                    info!("Skipping existing part #{idx} ({:?})", part_path);
147                    part_paths.push(part_path);
148                    continue;
149                }
150                Ok(_) => {
151                    warn!("Existing part #{idx} is empty/corrupt → regenerating");
152                }
153                Err(e) if e.kind() == io::ErrorKind::NotFound => {
154                    trace!("Part #{idx} missing → generating");
155                }
156                Err(e) => return Err(BatchModeTtsError::IoError(e)),
157            }
158
159            // Guardrail (should never trigger because chunk_text enforces ≤max_chars)
160            let seg_chars = segment.chars().count();
161            if seg_chars > 4096 {
162                warn!(
163                    "Segment #{idx} unexpectedly exceeds 4096 chars ({}); applying clip guardrail",
164                    seg_chars
165                );
166            }
167            let safe_input = if seg_chars > 4096 {
168                clip_to_chars(segment, 4096)
169            } else {
170                segment.clone()
171            };
172
173            let req: CreateSpeechRequest = CreateSpeechRequestArgs::default()
174                .input(safe_input)
175                .model(self.model().clone())
176                .voice(self.voice().clone())
177                .response_format(*self.response_format())
178                .speed(*self.speed())
179                .build()?;
180
181            let resp = audio_api.speech(req).await?;
182            info!("Received {} bytes for chunk #{idx}", resp.bytes.len());
183
184            tokio::fs::write(&part_path, resp.bytes).await?;
185            debug!("Wrote chunk #{idx} → {:?}", part_path);
186
187            part_paths.push(part_path);
188        }
189
190        // Sanity check – ensure we have every expected part (frozen + new)
191        if part_paths.len() != expected_total {
192            error!(
193                "Mismatch after generation: have {} part files but expected {}",
194                part_paths.len(),
195                expected_total
196            );
197            return Err(BatchModeTtsError::IoError(io::Error::new(
198                io::ErrorKind::Other,
199                "missing part files after resume‑aware generation",
200            )));
201        }
202
203        // -------------------------------------------------------------------
204        // 6) Merge parts via ffmpeg
205        // -------------------------------------------------------------------
206        self.merge_parts_ffmpeg(&part_paths).await?;
207
208        info!("Wrote final audio {}", self.output_path().display());
209        Ok(())
210    }
211}