1crate::ix!();
2
3impl BatchModeTtsJob {
4
5 #[tracing::instrument(
13 level = "info",
14 skip_all,
15 fields(
16 input = %self.input_path().display(),
17 output = %self.output_path().display(),
18 chunk = self.chunk_chars()
19 )
20 )]
21 pub async fn run(&self) -> Result<(), BatchModeTtsError> {
22 use std::io;
23
24 info!("Starting (resumable) chunked batch‑mode TTS job");
25
26 let text = tokio::fs::read_to_string(&self.input_path()).await?;
30 debug!("Loaded {} UTF‑8 bytes", text.len());
31
32 let work_dir = self
36 .work_dir()
37 .clone()
38 .unwrap_or_else(|| self.output_path().with_extension("parts"));
39 tokio::fs::create_dir_all(&work_dir).await.ok();
40 debug!("Using work dir {:?}", work_dir);
41
42 let ext = Self::ext_for_format(*self.response_format());
43
44 let hwm_opt = Self::find_high_water_mark(&work_dir, ext)?;
49 let hwm = hwm_opt.unwrap_or_else(|| usize::MAX.wrapping_sub(usize::MAX)); let anchored_count = if hwm_opt.is_some() { hwm + 1 } else { 0 };
51 if anchored_count > 0 {
52 info!("Found {anchored_count} existing contiguous part(s); high‑water mark is #{hwm:04}");
53 } else {
54 info!("No existing parts found; starting from scratch");
55 }
56
57 let legacy_chunks = Self::old_chunk_text(&text, *self.chunk_chars());
62 info!(
63 "Legacy splitter would produce {} chunk(s) at size {}",
64 legacy_chunks.len(),
65 self.chunk_chars()
66 );
67
68 if anchored_count > legacy_chunks.len() {
69 warn!(
70 "Existing parts exceed legacy recomputed chunk count ({} > {}) — \
71 seams likely changed since last run; clamping anchor to {}",
72 anchored_count,
73 legacy_chunks.len(),
74 legacy_chunks.len().saturating_sub(1)
75 );
76 }
77 let anchored_count = anchored_count.min(legacy_chunks.len());
78 let remainder_text = if anchored_count == 0 {
79 legacy_chunks.join("")
81 } else {
82 legacy_chunks[anchored_count..].join("")
83 };
84 debug!(
85 "Prepared remainder text after anchor: {} chars",
86 remainder_text.chars().count()
87 );
88
89 let max_chars = self.chunk_chars().min(&4096);
91 let new_chunks = Self::chunk_text(&remainder_text, *max_chars);
92 info!(
93 "Char‑safe splitter produced {} downstream chunk(s) at size ≤ {} chars",
94 new_chunks.len(),
95 max_chars
96 );
97
98 let expected_total = anchored_count + new_chunks.len();
100
101 let client = OpenAIClient::with_config(
105 OpenAIConfig::new().with_api_key(
106 std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env‑var missing"),
107 ),
108 );
109 let audio_api = async_openai::Audio::new(&client);
110
111 let mut part_paths = Vec::<PathBuf>::with_capacity(expected_total);
113
114 for i in 0..anchored_count {
116 let p = work_dir.join(format!("part_{i:04}.{ext}"));
117 match tokio::fs::metadata(&p).await {
118 Ok(meta) if meta.len() > 0 => {
119 info!("Skipping existing part #{i} ({:?})", p);
120 part_paths.push(p);
121 }
122 Ok(_) => {
123 warn!(
124 "Existing part #{i} is empty/corrupt — will be regenerated using *new* chunking"
125 );
126 }
128 Err(e) if e.kind() == io::ErrorKind::NotFound => {
129 warn!(
130 "Expected frozen part #{i} not found — will be regenerated using *new* chunking"
131 );
132 }
134 Err(e) => return Err(BatchModeTtsError::IoError(e)),
135 }
136 }
137
138 for (j, segment) in new_chunks.iter().enumerate() {
140 let idx = anchored_count + j;
141 let part_path = work_dir.join(format!("part_{idx:04}.{ext}"));
142
143 match tokio::fs::metadata(&part_path).await {
145 Ok(meta) if meta.len() > 0 => {
146 info!("Skipping existing part #{idx} ({:?})", part_path);
147 part_paths.push(part_path);
148 continue;
149 }
150 Ok(_) => {
151 warn!("Existing part #{idx} is empty/corrupt → regenerating");
152 }
153 Err(e) if e.kind() == io::ErrorKind::NotFound => {
154 trace!("Part #{idx} missing → generating");
155 }
156 Err(e) => return Err(BatchModeTtsError::IoError(e)),
157 }
158
159 let seg_chars = segment.chars().count();
161 if seg_chars > 4096 {
162 warn!(
163 "Segment #{idx} unexpectedly exceeds 4096 chars ({}); applying clip guardrail",
164 seg_chars
165 );
166 }
167 let safe_input = if seg_chars > 4096 {
168 clip_to_chars(segment, 4096)
169 } else {
170 segment.clone()
171 };
172
173 let req: CreateSpeechRequest = CreateSpeechRequestArgs::default()
174 .input(safe_input)
175 .model(self.model().clone())
176 .voice(self.voice().clone())
177 .response_format(*self.response_format())
178 .speed(*self.speed())
179 .build()?;
180
181 let resp = audio_api.speech(req).await?;
182 info!("Received {} bytes for chunk #{idx}", resp.bytes.len());
183
184 tokio::fs::write(&part_path, resp.bytes).await?;
185 debug!("Wrote chunk #{idx} → {:?}", part_path);
186
187 part_paths.push(part_path);
188 }
189
190 if part_paths.len() != expected_total {
192 error!(
193 "Mismatch after generation: have {} part files but expected {}",
194 part_paths.len(),
195 expected_total
196 );
197 return Err(BatchModeTtsError::IoError(io::Error::new(
198 io::ErrorKind::Other,
199 "missing part files after resume‑aware generation",
200 )));
201 }
202
203 self.merge_parts_ffmpeg(&part_paths).await?;
207
208 info!("Wrote final audio {}", self.output_path().display());
209 Ok(())
210 }
211}