phostt 0.4.3

Local STT server powered by Zipformer-vi RNN-T — on-device Vietnamese speech recognition via ONNX Runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
//! Minimal Prometheus text-exposition registry (replaces
//! `metrics-exporter-prometheus`).
//!
//! We only need counters and histograms, keyed by a small set of fixed
//! labels. Rolling our own drops ~40 transitive crates from `Cargo.lock`
//! (the `metrics`/`metrics-util`/`indexmap`/`atomic-waker`/…  stack) and
//! keeps the `/metrics` contract entirely in-tree. The emitted text
//! matches the Prometheus 0.0.4 exposition format documented at
//! <https://prometheus.io/docs/instrumenting/exposition_formats/>.
//!
//! ## Concurrency
//! `RwLock<HashMap<..>>` is fine for our workload — counters/histograms
//! are hit on every HTTP request (per-handler middleware), but the scrape
//! endpoint is typically polled every 15 s so reader contention is low.
//! When we need lock-free update later, swap `RwLock` for a sharded map.
//!
//! ## Default histogram buckets
//! Matches `metrics-exporter-prometheus`'s defaults, which themselves come
//! from the Prometheus Go client library. Tweakable per-metric via
//! `register_histogram_with_buckets`.

use std::collections::HashMap;
use std::fmt::Write;
use std::sync::RwLock;

/// Default histogram bucket bounds (seconds-scaled). Upper bound `f64::INFINITY`
/// is appended implicitly when rendering — consumers do not need to supply it.
pub const DEFAULT_BUCKETS: &[f64] = &[
    0.005, 0.01, 0.025, 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0,
];

/// Sorted label set keyed by ASCII name. Sorting keeps the serialised
/// label string stable regardless of the insertion order so the same
/// counter + label combination always maps to the same storage slot.
pub type Labels = Vec<(String, String)>;

fn sort_labels(mut labels: Labels) -> Labels {
    labels.sort_by(|a, b| a.0.cmp(&b.0));
    labels
}

fn format_labels(labels: &Labels) -> String {
    if labels.is_empty() {
        return String::new();
    }
    let mut out = String::from("{");
    for (i, (k, v)) in labels.iter().enumerate() {
        if i > 0 {
            out.push(',');
        }
        out.push_str(k);
        out.push_str("=\"");
        // Escape the label value — per the Prometheus text format, the
        // characters `\`, `"`, and `\n` must be escaped. Rare in practice
        // but cheap to do and prevents crafted label values from breaking
        // the exposition output.
        for ch in v.chars() {
            match ch {
                '\\' => out.push_str("\\\\"),
                '"' => out.push_str("\\\""),
                '\n' => out.push_str("\\n"),
                c => out.push(c),
            }
        }
        out.push('"');
    }
    out.push('}');
    out
}

#[derive(Debug, Default)]
struct CounterFamily {
    help: String,
    values: HashMap<Labels, u64>,
}

#[derive(Debug, Default)]
struct HistogramFamily {
    help: String,
    buckets: Vec<f64>,
    series: HashMap<Labels, HistogramSeries>,
}

#[derive(Debug, Default, Clone)]
struct HistogramSeries {
    /// Cumulative bucket counts; index `i` is observations ≤ `buckets[i]`.
    /// Trailing `+Inf` bucket is the grand total (`count`), not stored here.
    counts: Vec<u64>,
    sum: f64,
    count: u64,
}

/// Prometheus-compatible registry used by the server. Typically wrapped in
/// an `Arc` and stashed on `AppState` so every handler can record into it.
#[derive(Debug, Default)]
pub struct MetricsRegistry {
    counters: RwLock<HashMap<String, CounterFamily>>,
    histograms: RwLock<HashMap<String, HistogramFamily>>,
}

impl MetricsRegistry {
    /// Create an empty registry. Families are declared lazily on first use
    /// via `counter_inc` / `histogram_record` — separate `register_*`
    /// methods exist for setting help text ahead of time.
    pub fn new() -> Self {
        Self::default()
    }

    /// Set the `# HELP` text for a counter family. Called during startup;
    /// overwrites any previously registered help text for the same name.
    pub fn register_counter(&self, name: &str, help: &str) {
        let mut map = self.counters.write().unwrap_or_else(|e| e.into_inner());
        map.entry(name.to_string()).or_default().help = help.to_string();
    }

    /// Set the `# HELP` text and bucket bounds for a histogram family.
    /// Buckets are sorted and deduplicated; callers may pass
    /// [`DEFAULT_BUCKETS`] for the Prometheus client default.
    pub fn register_histogram(&self, name: &str, help: &str, buckets: &[f64]) {
        let mut normalised: Vec<f64> = buckets.to_vec();
        normalised.sort_by(|a, b| a.partial_cmp(b).unwrap_or(std::cmp::Ordering::Equal));
        normalised.dedup();
        let mut map = self.histograms.write().unwrap_or_else(|e| e.into_inner());
        let family = map.entry(name.to_string()).or_default();
        family.help = help.to_string();
        family.buckets = normalised;
    }

    /// Increment a counter. Lazily creates the family if it didn't exist.
    pub fn counter_inc(&self, name: &str, labels: Labels, delta: u64) {
        let labels = sort_labels(labels);
        let mut map = self.counters.write().unwrap_or_else(|e| e.into_inner());
        let family = map.entry(name.to_string()).or_default();
        *family.values.entry(labels).or_insert(0) += delta;
    }

    /// Record one observation into a histogram. Lazily creates the family
    /// with [`DEFAULT_BUCKETS`] if it didn't exist.
    pub fn histogram_record(&self, name: &str, labels: Labels, value: f64) {
        let labels = sort_labels(labels);
        let mut map = self.histograms.write().unwrap_or_else(|e| e.into_inner());
        let family = map.entry(name.to_string()).or_default();
        if family.buckets.is_empty() {
            family.buckets = DEFAULT_BUCKETS.to_vec();
        }
        let series = family
            .series
            .entry(labels)
            .or_insert_with(|| HistogramSeries {
                counts: vec![0; family.buckets.len()],
                sum: 0.0,
                count: 0,
            });
        // Keep the cumulative-counts vector in sync with the (possibly
        // re-registered) bucket list. Extending with zeros is correct
        // because extra buckets haven't seen observations yet.
        if series.counts.len() < family.buckets.len() {
            series.counts.resize(family.buckets.len(), 0);
        }
        for (i, &upper) in family.buckets.iter().enumerate() {
            if value <= upper {
                series.counts[i] += 1;
            }
        }
        series.sum += value;
        series.count += 1;
    }

    /// Render the current snapshot as Prometheus text. Formatting follows
    /// the `0.0.4; charset=utf-8` content type: `# HELP` and `# TYPE`
    /// comments per family, then one sample per (name, labels) pair.
    pub fn render_prometheus(&self) -> String {
        let mut out = String::new();

        // Counters first — stable alphabetical order for reproducible
        // scrape output across invocations.
        let counters = self.counters.read().unwrap_or_else(|e| e.into_inner());
        let mut names: Vec<&String> = counters.keys().collect();
        names.sort();
        for name in names {
            let family = &counters[name];
            if !family.help.is_empty() {
                let _ = writeln!(out, "# HELP {name} {}", family.help);
            }
            let _ = writeln!(out, "# TYPE {name} counter");
            let mut label_keys: Vec<&Labels> = family.values.keys().collect();
            label_keys.sort();
            for labels in label_keys {
                let _ = writeln!(
                    out,
                    "{name}{} {}",
                    format_labels(labels),
                    family.values[labels]
                );
            }
            out.push('\n');
        }
        drop(counters);

        let histograms = self.histograms.read().unwrap_or_else(|e| e.into_inner());
        let mut names: Vec<&String> = histograms.keys().collect();
        names.sort();
        for name in names {
            let family = &histograms[name];
            if !family.help.is_empty() {
                let _ = writeln!(out, "# HELP {name} {}", family.help);
            }
            let _ = writeln!(out, "# TYPE {name} histogram");
            let mut label_keys: Vec<&Labels> = family.series.keys().collect();
            label_keys.sort();
            for labels in label_keys {
                let series = &family.series[labels];
                // Emit one `_bucket{le="<upper>"}` line per boundary plus
                // the implicit `+Inf` line carrying the grand total. When
                // the series has pre-existing labels we splice `le=` in as
                // another comma-separated entry; when it doesn't we emit
                // the `le=` label alone.
                let base = format_labels(labels);
                let inner = trim_outer_braces(&base);
                let le_prefix: &str = if inner.is_empty() { "" } else { "," };
                for (i, &upper) in family.buckets.iter().enumerate() {
                    let _ = writeln!(
                        out,
                        "{name}_bucket{{{inner}{le_prefix}le=\"{}\"}} {}",
                        fmt_f64_prom(upper),
                        series.counts[i],
                    );
                }
                let _ = writeln!(
                    out,
                    "{name}_bucket{{{inner}{le_prefix}le=\"+Inf\"}} {}",
                    series.count
                );
                let _ = writeln!(out, "{name}_sum{} {}", base, fmt_f64_prom(series.sum),);
                let _ = writeln!(out, "{name}_count{} {}", base, series.count,);
            }
            out.push('\n');
        }

        out
    }
}

/// Strip the surrounding `{ … }` from a pre-formatted label block so we
/// can splice the `le=…` sample label into the same comma-separated
/// sequence. Returns `""` when the input has no labels.
fn trim_outer_braces(formatted: &str) -> &str {
    if formatted.is_empty() {
        return "";
    }
    let inner = formatted
        .strip_prefix('{')
        .and_then(|s| s.strip_suffix('}'))
        .unwrap_or(formatted);
    if inner.is_empty() { "" } else { inner }
}

/// Format a float the way the Prometheus go client does: `+Inf` for
/// infinity, `NaN` for NaN, default `{}` otherwise.
fn fmt_f64_prom(v: f64) -> String {
    if v.is_infinite() {
        return if v.is_sign_positive() {
            "+Inf".into()
        } else {
            "-Inf".into()
        };
    }
    if v.is_nan() {
        return "NaN".into();
    }
    format!("{v}")
}

#[cfg(test)]
mod tests {
    use super::*;

    fn registry() -> MetricsRegistry {
        let r = MetricsRegistry::new();
        r.register_counter(
            "phostt_http_requests_total",
            "Total HTTP requests processed",
        );
        r.register_histogram(
            "phostt_http_request_duration_seconds",
            "HTTP request duration",
            DEFAULT_BUCKETS,
        );
        r
    }

    #[test]
    fn test_render_empty_registry() {
        let r = MetricsRegistry::new();
        assert_eq!(r.render_prometheus(), "");
    }

    #[test]
    fn test_counter_increment_and_render() {
        let r = registry();
        r.counter_inc(
            "phostt_http_requests_total",
            vec![
                ("method".into(), "GET".into()),
                ("path".into(), "/health".into()),
                ("status".into(), "200".into()),
            ],
            1,
        );
        r.counter_inc(
            "phostt_http_requests_total",
            vec![
                ("method".into(), "GET".into()),
                ("path".into(), "/health".into()),
                ("status".into(), "200".into()),
            ],
            2,
        );
        let text = r.render_prometheus();
        assert!(text.contains("# HELP phostt_http_requests_total Total HTTP requests processed"));
        assert!(text.contains("# TYPE phostt_http_requests_total counter"));
        assert!(text.contains(
            "phostt_http_requests_total{method=\"GET\",path=\"/health\",status=\"200\"} 3"
        ));
    }

    #[test]
    fn test_histogram_bucket_cumulative() {
        let r = registry();
        let labels = vec![("method".into(), "GET".into())];
        for v in [0.001, 0.03, 0.3, 1.5] {
            r.histogram_record("phostt_http_request_duration_seconds", labels.clone(), v);
        }
        let text = r.render_prometheus();
        // 0.001 ≤ 0.005 → contributes to every bucket including 0.005+
        // 0.03  ≤ 0.05  → contributes to 0.05, 0.1, 0.25, 0.5, 1.0, 2.5, 5.0, 10.0
        // 0.3   ≤ 0.5   → contributes to 0.5, 1.0, 2.5, 5.0, 10.0
        // 1.5   ≤ 2.5   → contributes to 2.5, 5.0, 10.0
        assert!(text.contains(
            "phostt_http_request_duration_seconds_bucket{method=\"GET\",le=\"0.005\"} 1"
        ));
        assert!(
            text.contains(
                "phostt_http_request_duration_seconds_bucket{method=\"GET\",le=\"0.05\"} 2"
            )
        );
        assert!(
            text.contains(
                "phostt_http_request_duration_seconds_bucket{method=\"GET\",le=\"0.5\"} 3"
            )
        );
        assert!(
            text.contains(
                "phostt_http_request_duration_seconds_bucket{method=\"GET\",le=\"+Inf\"} 4"
            )
        );
        assert!(text.contains("phostt_http_request_duration_seconds_count{method=\"GET\"} 4"));
    }

    #[test]
    fn test_label_ordering_stable() {
        let r = MetricsRegistry::new();
        r.counter_inc(
            "c",
            vec![("b".into(), "1".into()), ("a".into(), "2".into())],
            1,
        );
        r.counter_inc(
            "c",
            vec![("a".into(), "2".into()), ("b".into(), "1".into())],
            4,
        );
        let text = r.render_prometheus();
        // Same counter despite different insert order — totals to 5.
        assert!(text.contains("c{a=\"2\",b=\"1\"} 5"));
    }

    #[test]
    fn test_label_escaping() {
        let r = MetricsRegistry::new();
        r.counter_inc("c", vec![("l".into(), "a\"b\\c\nd".into())], 1);
        let text = r.render_prometheus();
        assert!(
            text.contains("c{l=\"a\\\"b\\\\c\\nd\"} 1"),
            "escape failed: {text}"
        );
    }

    #[test]
    fn test_empty_labels_render() {
        let r = MetricsRegistry::new();
        r.counter_inc("c", vec![], 7);
        let text = r.render_prometheus();
        assert!(text.contains("c 7"));
    }

    #[test]
    fn test_histogram_sum_tracks_observations() {
        let r = MetricsRegistry::new();
        r.register_histogram("h", "H", &[1.0, 2.0]);
        r.histogram_record("h", vec![], 0.5);
        r.histogram_record("h", vec![], 1.5);
        r.histogram_record("h", vec![], 2.5);
        let text = r.render_prometheus();
        assert!(text.contains("h_sum 4.5"));
        assert!(text.contains("h_count 3"));
    }

    #[test]
    fn test_metrics_survive_poison() {
        // Spec 001: A panic while holding the metrics write lock must NOT
        // break all subsequent metric writes.
        let r = MetricsRegistry::new();

        // Poison the counters lock by panicking inside a write guard.
        let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
            let _guard = r.counters.write().expect("counters lock");
            panic!("simulated panic during metric write");
        }));

        // After poison, the next counter_inc should succeed (not panic).
        // Current implementation panics here with "counters lock poisoned".
        r.counter_inc("c", vec![], 1);
        let text = r.render_prometheus();
        assert!(
            text.contains("c 1"),
            "metrics should survive poison: {text}"
        );
    }

    #[test]
    fn test_metrics_multiple_poison_cycles() {
        // Spec 001: Metrics must survive repeated poison events.
        let r = MetricsRegistry::new();

        for i in 0..3 {
            let _ = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
                let _guard = r.counters.write().expect("counters lock");
                panic!("poison cycle {i}");
            }));
            r.counter_inc("c", vec![], 1);
        }

        let text = r.render_prometheus();
        assert!(
            text.contains("c 3"),
            "metrics should accumulate across poison cycles: {text}"
        );
    }
}