phostt 0.4.3

Local STT server powered by Zipformer-vi RNN-T — on-device Vietnamese speech recognition via ONNX Runtime
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
//! End-to-end WebSocket protocol tests.
//!
//! All tests require the Zipformer-vi ONNX model to be downloaded (~850MB).
//! Run with: `cargo test --test e2e_ws -- --ignored`

mod common;

use futures_util::{SinkExt, StreamExt};
use std::time::Duration;
use tokio_tungstenite::tungstenite::Message;

// ---------------------------------------------------------------------------
// 1. Ready message validation
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_connect_receives_ready() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (_sink, _stream, ready) = common::ws_connect(port).await;

    assert_eq!(ready["type"], "ready");
    assert_eq!(ready["version"], "1.0");
    assert_eq!(ready["sample_rate"], 48000);
    assert!(
        ready["model"].as_str().unwrap().contains("zipformer"),
        "model field should contain 'zipformer', got: {:?}",
        ready["model"]
    );

    let rates = ready["supported_rates"]
        .as_array()
        .expect("supported_rates should be an array");
    assert!(
        rates.len() >= 5,
        "supported_rates should have >=5 entries, got {}",
        rates.len()
    );
    assert!(
        rates.contains(&serde_json::json!(8000)),
        "supported_rates should contain 8000"
    );
    assert!(
        rates.contains(&serde_json::json!(48000)),
        "supported_rates should contain 48000"
    );
}

// ---------------------------------------------------------------------------
// 2. Audio → Final
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_audio_produces_final() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (mut sink, mut stream, _ready) = common::ws_connect(port).await;

    // Stream real Vietnamese audio from the model bundle
    let audio = common::pcm16_from_wav(&common::test_wav_path(0));
    for chunk in audio.chunks(9600) {
        sink.send(Message::Binary(chunk.to_vec().into()))
            .await
            .unwrap();
    }

    // Send Stop
    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "stop"}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    // Drain any Partial messages; we only care about Final
    loop {
        let msg = tokio::time::timeout(Duration::from_secs(30), stream.next())
            .await
            .expect("timeout waiting for Final")
            .expect("stream ended")
            .expect("ws error");

        let text = msg.into_text().expect("expected text message");
        let v: serde_json::Value = serde_json::from_str(&text).expect("expected JSON");
        match v["type"].as_str().unwrap_or("") {
            "partial" => continue,
            "final" => {
                let text_str = v["text"]
                    .as_str()
                    .expect("Final message should have a text field");
                assert!(
                    !text_str.trim().is_empty(),
                    "Expected non-empty Vietnamese transcription, got: {text_str}"
                );
                break;
            }
            other => panic!("Unexpected message type: {other}, full: {text}"),
        }
    }
}

// ---------------------------------------------------------------------------
// 3. Stop without audio → Final with empty text
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_stop_without_audio() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (mut sink, mut stream, _ready) = common::ws_connect(port).await;

    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "stop"}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    let msg = tokio::time::timeout(Duration::from_secs(10), stream.next())
        .await
        .expect("timeout waiting for Final")
        .expect("stream ended")
        .expect("ws error");

    let v = common::assert_msg_type(msg, "final");
    assert_eq!(
        v["text"].as_str().unwrap_or(""),
        "",
        "Expected empty text for stop-without-audio"
    );
}

// ---------------------------------------------------------------------------
// 4. Configure with valid sample rate → Final (no error)
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_configure_valid_sample_rate() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (mut sink, mut stream, _ready) = common::ws_connect(port).await;

    // Configure to 16kHz
    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "configure", "sample_rate": 16000}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    // Stream ~1 second of real Vietnamese audio at 16kHz (PCM16 LE)
    let audio = common::pcm16_from_wav(&common::test_wav_path(0));
    let one_sec_bytes = 16000usize * 2; // 1 second at 16kHz
    let chunk = &audio[..audio.len().min(one_sec_bytes)];
    sink.send(Message::Binary(chunk.to_vec().into()))
        .await
        .unwrap();

    // Send Stop
    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "stop"}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    // Drain Partials, expect Final (not Error)
    loop {
        let msg = tokio::time::timeout(Duration::from_secs(20), stream.next())
            .await
            .expect("timeout waiting for Final")
            .expect("stream ended")
            .expect("ws error");

        let text = msg.into_text().expect("expected text message");
        let v: serde_json::Value = serde_json::from_str(&text).expect("expected JSON");
        match v["type"].as_str().unwrap_or("") {
            "partial" => continue,
            "final" => break,
            other => panic!("Unexpected message type: {other} (expected final, not error)"),
        }
    }
}

// ---------------------------------------------------------------------------
// 5. Configure with invalid sample rate → Error
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_configure_invalid_sample_rate() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (mut sink, mut stream, _ready) = common::ws_connect(port).await;

    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "configure", "sample_rate": 7000}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    let msg = tokio::time::timeout(Duration::from_secs(5), stream.next())
        .await
        .expect("timeout waiting for Error")
        .expect("stream ended")
        .expect("ws error");

    let v = common::assert_msg_type(msg, "error");
    assert_eq!(
        v["code"], "invalid_sample_rate",
        "Expected code=invalid_sample_rate, got: {:?}",
        v["code"]
    );
}

// ---------------------------------------------------------------------------
// 6. Configure after audio has been sent → Error
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_configure_after_audio() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (mut sink, mut stream, _ready) = common::ws_connect(port).await;

    // Send some audio first
    let silence = common::generate_pcm16_silence(0.1, 48000);
    sink.send(Message::Binary(silence.into())).await.unwrap();

    // Now try to configure — should be rejected
    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "configure", "sample_rate": 16000}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    let msg = tokio::time::timeout(Duration::from_secs(5), stream.next())
        .await
        .expect("timeout waiting for Error")
        .expect("stream ended")
        .expect("ws error");

    let v = common::assert_msg_type(msg, "error");
    assert_eq!(
        v["code"], "configure_too_late",
        "Expected code=configure_too_late, got: {:?}",
        v["code"]
    );
}

// ---------------------------------------------------------------------------
// 7. Malformed JSON → connection stays alive, Stop still works
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_malformed_json() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let (mut sink, mut stream, _ready) = common::ws_connect(port).await;

    // Send garbage text that is not valid JSON
    sink.send(Message::Text("not json at all {{".to_string().into()))
        .await
        .unwrap();

    // Connection must NOT be closed; send Stop and expect Final
    sink.send(Message::Text(
        serde_json::to_string(&serde_json::json!({"type": "stop"}))
            .unwrap()
            .into(),
    ))
    .await
    .unwrap();

    // Drain until Final (server silently ignores malformed messages)
    loop {
        let msg = tokio::time::timeout(Duration::from_secs(10), stream.next())
            .await
            .expect("timeout — connection may have been closed by malformed JSON")
            .expect("stream ended unexpectedly after malformed JSON")
            .expect("ws error");

        let text = msg.into_text().expect("expected text message");
        let v: serde_json::Value = serde_json::from_str(&text).expect("expected JSON");
        match v["type"].as_str().unwrap_or("") {
            "partial" => continue,
            "final" => break,
            other => panic!("Unexpected message type after malformed JSON: {other}"),
        }
    }
}

// ---------------------------------------------------------------------------
// 8. Client disconnect mid-stream → server remains healthy
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_client_disconnect_midstream() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    // First client: send audio then abruptly disconnect (drop sink + stream)
    {
        let (mut sink, _stream, _ready) = common::ws_connect(port).await;
        let silence = common::generate_pcm16_silence(0.5, 48000);
        sink.send(Message::Binary(silence.into())).await.unwrap();
        // Dropped here — abrupt disconnect without sending Close frame
    }

    // Give server a moment to detect the disconnect
    tokio::time::sleep(Duration::from_millis(200)).await;

    // Verify server is still healthy: a new client should connect and receive Ready
    let (_sink2, _stream2, ready2) = common::ws_connect(port).await;
    assert_eq!(
        ready2["type"], "ready",
        "Server should still be healthy after abrupt client disconnect"
    );
}

// ---------------------------------------------------------------------------
// 9. Four concurrent clients — all receive Ready and Final
// ---------------------------------------------------------------------------

#[tokio::test]
#[ignore] // Requires model download
async fn test_ws_concurrent_4_clients() {
    let model_dir = common::model_dir();
    let (port, _shutdown) = common::start_server(&model_dir).await;

    let url = format!("ws://127.0.0.1:{port}/ws");

    let mut handles = Vec::new();
    for i in 0..4usize {
        let url = url.clone();
        handles.push(tokio::spawn(async move {
            let (ws, _) = tokio_tungstenite::connect_async(&url)
                .await
                .unwrap_or_else(|e| panic!("Client {i} failed to connect: {e}"));
            let (mut sink, mut stream) = ws.split();

            // Should receive Ready
            let msg = tokio::time::timeout(Duration::from_secs(10), stream.next())
                .await
                .expect("timeout waiting for Ready")
                .expect("stream ended")
                .expect("ws error");
            let text = msg.into_text().unwrap();
            let v: serde_json::Value = serde_json::from_str(&text).unwrap();
            assert_eq!(v["type"], "ready", "Client {i} did not receive Ready");

            // Send Stop
            sink.send(Message::Text(
                serde_json::to_string(&serde_json::json!({"type": "stop"}))
                    .unwrap()
                    .into(),
            ))
            .await
            .unwrap();

            // Should receive Final
            let msg = tokio::time::timeout(Duration::from_secs(10), stream.next())
                .await
                .expect("timeout waiting for Final")
                .expect("stream ended")
                .expect("ws error");
            let text = msg.into_text().unwrap();
            let v: serde_json::Value = serde_json::from_str(&text).unwrap();
            assert_eq!(
                v["type"], "final",
                "Client {i} did not receive Final after Stop"
            );

            i
        }));
    }

    for handle in handles {
        let client_id = tokio::time::timeout(Duration::from_secs(30), handle)
            .await
            .expect("client task timed out")
            .expect("client task panicked");
        assert!(client_id < 4);
    }
}