1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
crate::ix!();
impl BatchModeTtsJob {
/// Execute the job with automatic chunking & ffmpeg concat.
///
/// This operation is **idempotent** and _resume‑safe_:
/// * Already‑present, non‑empty `part_####.<ext>` files in the working
/// directory are detected and **skipped**, allowing the job to continue
/// from the first missing chunk after any interruption.
/// * Empty or corrupt part files are automatically regenerated.
#[tracing::instrument(
level = "info",
skip_all,
fields(
input = %self.input_path().display(),
output = %self.output_path().display(),
chunk = self.chunk_chars()
)
)]
pub async fn run(&self) -> Result<(), BatchModeTtsError> {
use std::io;
info!("Starting (resumable) chunked batch‑mode TTS job");
// -------------------------------------------------------------------
// 1) Load full input
// -------------------------------------------------------------------
let text = tokio::fs::read_to_string(&self.input_path()).await?;
debug!("Loaded {} UTF‑8 bytes", text.len());
// -------------------------------------------------------------------
// 2) Prep workspace + extension
// -------------------------------------------------------------------
let work_dir = self
.work_dir()
.clone()
.unwrap_or_else(|| self.output_path().with_extension("parts"));
tokio::fs::create_dir_all(&work_dir).await.ok();
debug!("Using work dir {:?}", work_dir);
let ext = Self::ext_for_format(*self.response_format());
// -------------------------------------------------------------------
// 3) Discover high‑water mark (HWM) of already generated parts
// We will *not* regenerate anything ≤ HWM.
// -------------------------------------------------------------------
let hwm_opt = Self::find_high_water_mark(&work_dir, ext)?;
let hwm = hwm_opt.unwrap_or_else(|| usize::MAX.wrapping_sub(usize::MAX)); // 0 saturating trick
let anchored_count = if hwm_opt.is_some() { hwm + 1 } else { 0 };
if anchored_count > 0 {
info!("Found {anchored_count} existing contiguous part(s); high‑water mark is #{hwm:04}");
} else {
info!("No existing parts found; starting from scratch");
}
// -------------------------------------------------------------------
// 4) Reproduce *legacy* (byte‑biased) boundaries up to HWM,
// then re‑chunk the *remainder* with the corrected char‑safe splitter.
// -------------------------------------------------------------------
let legacy_chunks = Self::old_chunk_text(&text, *self.chunk_chars());
info!(
"Legacy splitter would produce {} chunk(s) at size {}",
legacy_chunks.len(),
self.chunk_chars()
);
if anchored_count > legacy_chunks.len() {
warn!(
"Existing parts exceed legacy recomputed chunk count ({} > {}) — \
seams likely changed since last run; clamping anchor to {}",
anchored_count,
legacy_chunks.len(),
legacy_chunks.len().saturating_sub(1)
);
}
let anchored_count = anchored_count.min(legacy_chunks.len());
let remainder_text = if anchored_count == 0 {
// Use normalized corpus produced by the legacy splitter for consistency
legacy_chunks.join("")
} else {
legacy_chunks[anchored_count..].join("")
};
debug!(
"Prepared remainder text after anchor: {} chars",
remainder_text.chars().count()
);
// New corrected splitter: strictly ≤ min(self.chunk_chars, 4096) characters
let max_chars = self.chunk_chars().min(&4096);
let new_chunks = Self::chunk_text(&remainder_text, *max_chars);
info!(
"Char‑safe splitter produced {} downstream chunk(s) at size ≤ {} chars",
new_chunks.len(),
max_chars
);
// Total expected parts = anchored (frozen) + newly planned
let expected_total = anchored_count + new_chunks.len();
// -------------------------------------------------------------------
// 5) Set up API client
// -------------------------------------------------------------------
let client = OpenAIClient::with_config(
OpenAIConfig::new().with_api_key(
std::env::var("OPENAI_API_KEY").expect("OPENAI_API_KEY env‑var missing"),
),
);
let audio_api = async_openai::Audio::new(&client);
// We will accumulate all final part paths (frozen + newly generated)
let mut part_paths = Vec::<PathBuf>::with_capacity(expected_total);
// 5a) Register frozen parts [0 .. anchored_count)
for i in 0..anchored_count {
let p = work_dir.join(format!("part_{i:04}.{ext}"));
match tokio::fs::metadata(&p).await {
Ok(meta) if meta.len() > 0 => {
info!("Skipping existing part #{i} ({:?})", p);
part_paths.push(p);
}
Ok(_) => {
warn!(
"Existing part #{i} is empty/corrupt — will be regenerated using *new* chunking"
);
// Fallthrough: allow regeneration below by *injecting* this index into downstream flow.
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
warn!(
"Expected frozen part #{i} not found — will be regenerated using *new* chunking"
);
// Fallthrough to regeneration path below.
}
Err(e) => return Err(BatchModeTtsError::IoError(e)),
}
}
// 5b) Generate/skip downstream parts [anchored_count .. expected_total)
for (j, segment) in new_chunks.iter().enumerate() {
let idx = anchored_count + j;
let part_path = work_dir.join(format!("part_{idx:04}.{ext}"));
// If the part already exists and is non‑empty, skip regeneration.
match tokio::fs::metadata(&part_path).await {
Ok(meta) if meta.len() > 0 => {
info!("Skipping existing part #{idx} ({:?})", part_path);
part_paths.push(part_path);
continue;
}
Ok(_) => {
warn!("Existing part #{idx} is empty/corrupt → regenerating");
}
Err(e) if e.kind() == io::ErrorKind::NotFound => {
trace!("Part #{idx} missing → generating");
}
Err(e) => return Err(BatchModeTtsError::IoError(e)),
}
// Guardrail (should never trigger because chunk_text enforces ≤max_chars)
let seg_chars = segment.chars().count();
if seg_chars > 4096 {
warn!(
"Segment #{idx} unexpectedly exceeds 4096 chars ({}); applying clip guardrail",
seg_chars
);
}
let safe_input = if seg_chars > 4096 {
clip_to_chars(segment, 4096)
} else {
segment.clone()
};
let req: CreateSpeechRequest = CreateSpeechRequestArgs::default()
.input(safe_input)
.model(self.model().clone())
.voice(self.voice().clone())
.response_format(*self.response_format())
.speed(*self.speed())
.build()?;
let resp = audio_api.speech(req).await?;
info!("Received {} bytes for chunk #{idx}", resp.bytes.len());
tokio::fs::write(&part_path, resp.bytes).await?;
debug!("Wrote chunk #{idx} → {:?}", part_path);
part_paths.push(part_path);
}
// Sanity check – ensure we have every expected part (frozen + new)
if part_paths.len() != expected_total {
error!(
"Mismatch after generation: have {} part files but expected {}",
part_paths.len(),
expected_total
);
return Err(BatchModeTtsError::IoError(io::Error::new(
io::ErrorKind::Other,
"missing part files after resume‑aware generation",
)));
}
// -------------------------------------------------------------------
// 6) Merge parts via ffmpeg
// -------------------------------------------------------------------
self.merge_parts_ffmpeg(&part_paths).await?;
info!("Wrote final audio {}", self.output_path().display());
Ok(())
}
}