use super::*;
use crate::{
audio::stt::streaming::{encoder::StreamingEncoderBackend, types::DelayPreset},
error::InvariantViolationPayload,
};
use std::sync::Mutex;
struct ScriptedEncoder {
window_size: usize,
err_script: Mutex<Vec<bool>>,
calls: Mutex<usize>,
fingerprints: Mutex<Vec<f32>>,
}
impl ScriptedEncoder {
fn new(window_size: usize, err_script: Vec<bool>) -> Self {
Self {
window_size,
err_script: Mutex::new(err_script),
calls: Mutex::new(0),
fingerprints: Mutex::new(Vec::new()),
}
}
fn call_count(&self) -> usize {
*self.calls.lock().unwrap()
}
fn fingerprints(&self) -> Vec<f32> {
self.fingerprints.lock().unwrap().clone()
}
}
impl StreamingEncoderBackend for ScriptedEncoder {
fn window_size(&self) -> usize {
self.window_size
}
fn encode_window(&self, mel_window: &Array, _valid_frames: usize) -> Result<Array> {
*self.calls.lock().unwrap() += 1;
let fingerprint = mel_window
.try_clone()
.and_then(|mut a| a.to_vec::<f32>())
.ok()
.and_then(|v| v.first().copied())
.unwrap_or(f32::NAN);
self.fingerprints.lock().unwrap().push(fingerprint);
let mut script = self.err_script.lock().unwrap();
let should_err = if script.is_empty() {
false
} else {
script.remove(0)
};
if should_err {
return Err(crate::error::Error::InvariantViolation(
crate::error::InvariantViolationPayload::new(
"ScriptedEncoder::encode_window",
"scripted failure",
),
));
}
let rows = mel_window.shape().first().copied().unwrap_or(0);
let buf = vec![0.0_f32; rows * 2];
Array::from_slice::<f32>(&buf, &[rows as i32, 2i32])
}
}
struct ScriptedDecoder {
results: Mutex<Vec<Result<Vec<u32>>>>,
calls: Mutex<usize>,
}
impl ScriptedDecoder {
fn with_results(results: Vec<Result<Vec<u32>>>) -> Self {
Self {
results: Mutex::new(results),
calls: Mutex::new(0),
}
}
fn call_count(&self) -> usize {
*self.calls.lock().unwrap()
}
}
impl StreamingDecoderBackend for ScriptedDecoder {
fn decode_all_tokens(
&self,
_audio_features: &Array,
_confirmed_token_ids: &[u32],
_config: &StreamingConfig,
_max_tokens: usize,
) -> Result<Vec<u32>> {
*self.calls.lock().unwrap() += 1;
let mut q = self.results.lock().unwrap();
if q.is_empty() {
Ok(Vec::new())
} else {
q.remove(0)
}
}
}
struct MockTokenizer;
impl StreamingTokenizer for MockTokenizer {
fn decode_ids(&self, ids: &[u32]) -> String {
ids
.iter()
.map(|id| format!("t{id}"))
.collect::<Vec<_>>()
.join(" ")
}
}
fn nonfinalize_session(
encoder: ScriptedEncoder,
decoder: ScriptedDecoder,
) -> StreamingInferenceSession<ScriptedEncoder, ScriptedDecoder, MockTokenizer> {
let cfg = StreamingConfig::default()
.with_decode_interval_seconds(0.0)
.with_boundary_decode_interval_seconds(0.0)
.with_boundary_boost_seconds(0.0)
.with_max_cached_windows(8)
.with_finalize_completed_windows(false)
.with_min_agreement_passes(1)
.with_boundary_min_agreement_passes(1)
.with_delay_preset(DelayPreset::Custom(0));
StreamingInferenceSession::new(decoder, MockTokenizer, cfg, encoder, 16_000, 8, 0).unwrap()
}
fn finalize_session(
encoder: ScriptedEncoder,
decoder: ScriptedDecoder,
) -> StreamingInferenceSession<ScriptedEncoder, ScriptedDecoder, MockTokenizer> {
let cfg = StreamingConfig::default()
.with_decode_interval_seconds(0.0)
.with_boundary_decode_interval_seconds(0.0)
.with_boundary_boost_seconds(0.0)
.with_max_cached_windows(8)
.with_finalize_completed_windows(true)
.with_min_agreement_passes(1)
.with_boundary_min_agreement_passes(1)
.with_delay_preset(DelayPreset::Custom(0));
StreamingInferenceSession::new(decoder, MockTokenizer, cfg, encoder, 16_000, 8, 0).unwrap()
}
fn drive_two_phase(
session: &mut StreamingInferenceSession<ScriptedEncoder, ScriptedDecoder, MockTokenizer>,
) -> (
Result<Vec<TranscriptionEvent>>,
Result<Vec<TranscriptionEvent>>,
) {
let partial: Vec<f32> = (0..800).map(|i| (i as f32 * 0.001).sin()).collect();
let topup: Vec<f32> = (800..2_000).map(|i| (i as f32 * 0.001).sin()).collect();
let partial_events = session.feed_audio(&partial);
let boundary_events = session.feed_audio(&topup);
(partial_events, boundary_events)
}
#[test]
fn feed_audio_short_input_yields_no_events_until_mel_emits() {
let encoder = ScriptedEncoder::new(16, vec![]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![10, 11, 12])]);
let mut session = nonfinalize_session(encoder, decoder);
let events = session.feed_audio(&[0.0_f32; 1]).unwrap();
assert!(events.is_empty(), "events={events:?}");
}
#[test]
fn feed_audio_long_input_drives_partial_decode() {
let encoder = ScriptedEncoder::new(16, vec![]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![10, 11, 12])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..2400).map(|i| (i as f32 * 0.001).sin()).collect();
let events = session.feed_audio(&samples).unwrap();
assert_eq!(session.decoder.call_count(), 1);
assert!(
matches!(events.first(), Some(TranscriptionEvent::Confirmed(_))),
"events[0]={:?}",
events.first()
);
assert!(
events
.iter()
.any(|e| matches!(e, TranscriptionEvent::Stats(_)))
);
}
#[test]
fn stop_emits_ended_event() {
let encoder = ScriptedEncoder::new(16, vec![]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![10])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..2400).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
let stop_events = session.stop().unwrap();
assert!(
matches!(stop_events.last(), Some(TranscriptionEvent::Ended(_))),
"stop events: {stop_events:?}"
);
assert!(!session.is_active());
}
#[test]
fn cancel_marks_inactive_and_drops_state() {
let encoder = ScriptedEncoder::new(16, vec![]);
let decoder = ScriptedDecoder::with_results(vec![]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..2400).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
session.cancel();
assert!(!session.is_active());
let after = session.feed_audio(&samples).unwrap();
assert!(after.is_empty());
}
#[test]
fn append_text_basic_concatenation_and_trim() {
let mut base = String::new();
append_text("hello", &mut base);
assert_eq!(base, "hello");
append_text("world", &mut base);
assert_eq!(base, "hello world");
append_text(" ", &mut base);
assert_eq!(base, "hello world");
append_text("!", &mut base);
assert_eq!(base, "hello world !");
}
#[test]
fn streaming_session_first_window_finalization_runs_full_decode_not_streamed_fallback() {
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![1, 2, 3]), Ok(vec![9, 8, 7])]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
boundary_events.unwrap();
assert!(
session.decoder.call_count() >= 2,
"expected >= 2 decoder calls (pending + finalize), got {}",
session.decoder.call_count()
);
}
#[test]
fn streaming_session_first_window_finalization_appends_full_decode_not_partial_text() {
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![1, 2, 3]), Ok(vec![90, 91, 92])]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
boundary_events.unwrap();
let stop_events = session.stop().unwrap();
let TranscriptionEvent::Ended(full_text) = stop_events
.last()
.expect("expected Ended event at stop()")
.clone()
else {
panic!("last stop event was not Ended: {stop_events:?}");
};
assert!(
full_text.contains("t90"),
"Ended.full_text must include the full-decode text, got {full_text:?}"
);
}
#[test]
fn streaming_session_decoder_error_keeps_window_for_retry_then_feed_audio_drains() {
use crate::error::Error;
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![1]),
Err(Error::InvariantViolation(InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted finalize failure",
))),
Ok(vec![42]),
]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
assert!(boundary_events.is_err());
assert_eq!(
session.retry_state().finalize_queue().len(),
1,
"errored finalize must leave the window in the retry queue"
);
let retry_events = session.feed_audio(&[0.0_f32; 200]).unwrap();
assert_eq!(
session.retry_state().finalize_queue().len(),
0,
"successful retry must pop the previously-failed window"
);
assert!(
!retry_events.is_empty(),
"retry decode must emit at least the Stats event"
);
}
#[test]
fn streaming_session_decoder_error_does_not_advance_frozen_window_count() {
use crate::error::Error;
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![1]),
Err(Error::InvariantViolation(InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted finalize failure",
))),
]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
assert!(boundary_events.is_err());
assert_eq!(session.encoded_window_count(), 1);
assert_eq!(session.frozen_window_count, 0);
assert_eq!(session.retry_state().finalize_queue().len(), 1);
}
#[test]
fn streaming_session_stop_with_finalize_err_can_be_retried_with_second_stop() {
use crate::error::Error;
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![1]),
Err(Error::InvariantViolation(InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted boundary finalize Err",
))),
Err(Error::InvariantViolation(InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted stop-retry finalize Err",
))),
Ok(vec![42]),
]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
assert!(boundary_events.is_err(), "boundary feed must Err");
assert_eq!(session.retry_state().finalize_queue().len(), 1);
let stop_first = session.stop();
assert!(
stop_first.is_err(),
"first stop() must propagate the scripted finalize Err"
);
let stop_second = session.stop().expect("second stop() must succeed");
assert!(
matches!(stop_second.last(), Some(TranscriptionEvent::Ended(_))),
"second stop() must emit terminal Ended, got {stop_second:?}"
);
assert!(!session.is_active());
assert_eq!(session.retry_state().finalize_queue().len(), 0);
}
#[test]
fn streaming_session_pending_retry_finalizes_on_empty_feed_audio() {
use crate::error::Error;
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![1]),
Err(Error::InvariantViolation(InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted boundary finalize Err",
))),
Ok(vec![77]),
]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
assert!(boundary_events.is_err());
assert_eq!(session.retry_state().finalize_queue().len(), 1);
let calls_before = session.decoder.call_count();
let retry_events = session
.feed_audio(&[])
.expect("empty feed_audio retry must succeed");
assert_eq!(session.retry_state().finalize_queue().len(), 0);
assert!(session.decoder.call_count() > calls_before);
assert!(!retry_events.is_empty());
}
#[test]
fn streaming_session_fallback_not_reapplied_on_retry_after_err() {
use crate::error::Error;
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![123]),
Err(Error::InvariantViolation(InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted boundary finalize Err",
))),
Ok(vec![]),
]);
let mut session = finalize_session(encoder, decoder);
let (partial_events, boundary_events) = drive_two_phase(&mut session);
partial_events.unwrap();
assert!(boundary_events.is_err());
assert_eq!(session.retry_state().finalize_queue().len(), 1);
let _ = session
.feed_audio(&[])
.expect("retry feed_audio must succeed");
assert_eq!(session.retry_state().finalize_queue().len(), 0);
assert!(
!session.shared.completed_text.contains("t123"),
"stale streamed fallback must NOT be frozen, got {:?}",
session.shared.completed_text
);
}
#[test]
fn session_retry_state_stop_with_encoder_feed_err_can_be_retried() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![1]), Ok(vec![2, 3, 4])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
let overlap_before = session.mel_processor.overlap_buffer_len();
assert!(
overlap_before > 0,
"test precondition: overlap must be populated"
);
let stop_first = session.stop();
assert!(stop_first.is_err());
assert_eq!(session.mel_processor.overlap_buffer_len(), 0);
assert!(
session.retry_state().has_pending_stop_encoder_feed(),
"SessionRetryState StopEncoderFeed MUST hold the tail mel"
);
assert!(
session.is_active(),
"errored stop() must leave session active"
);
let stop_second = session.stop().expect("retry stop() must succeed");
assert!(matches!(
stop_second.last(),
Some(TranscriptionEvent::Ended(_))
));
assert!(!session.is_active());
assert!(!session.retry_state().has_obligation());
}
#[test]
fn session_retry_state_feed_audio_drains_stop_tail_before_new_audio() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5]), Ok(vec![6, 7])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let stop_first = session.stop();
assert!(stop_first.is_err());
assert!(session.retry_state().has_pending_stop_encoder_feed());
let stop_err_call_idx = session.encoder.backend().call_count() - 1;
let stop_err_fingerprint = session.encoder.backend().fingerprints()[stop_err_call_idx];
let samples_b: Vec<f32> = (0..1200)
.map(|i| ((i as f32 + 7.0) * 0.013).cos())
.collect();
let _events = session
.feed_audio(&samples_b)
.expect("feed_audio after staged-tail stop Err MUST succeed");
assert!(
!session.retry_state().has_pending_stop_encoder_feed(),
"feed_audio MUST clear StopEncoderFeed after a successful drain"
);
let fingerprints = session.encoder.backend().fingerprints();
let bridge_drain_idx = stop_err_call_idx + 1;
assert!(fingerprints.len() > bridge_drain_idx);
assert_eq!(
fingerprints[bridge_drain_idx].to_bits(),
stop_err_fingerprint.to_bits(),
"ORDER: the call immediately after stop-Err MUST be the bridge \
drain (bit-identical fingerprint). fingerprints={fingerprints:?}"
);
}
#[test]
fn session_retry_state_feed_audio_empty_samples_drains_bridge_and_decodes() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5]), Ok(vec![6, 7, 8])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let _ = session.stop();
assert!(session.retry_state().has_pending_stop_encoder_feed());
let decoder_calls_before = session.decoder.call_count();
session.last_decode_time = None;
let _events = session
.feed_audio(&[])
.expect("empty feed_audio MUST succeed by draining the bridge");
assert!(!session.retry_state().has_pending_stop_encoder_feed());
assert_eq!(
session.decoder.call_count(),
decoder_calls_before + 1,
"drained window MUST be decoded in the same call"
);
assert!(!session.retry_state().has_obligation());
}
#[test]
fn session_retry_state_drained_windows_decoded_in_same_call() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5]), Ok(vec![6, 7, 8])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let decoder_calls_after_initial = session.decoder.call_count();
let _ = session.stop();
assert!(session.retry_state().has_pending_stop_encoder_feed());
let encoder_windows_before_drain = session.encoder.encoded_window_count();
session.last_decode_time = None;
let _events = session
.feed_audio(&[])
.expect("feed_audio after staged-tail stop Err MUST succeed");
assert!(!session.retry_state().has_pending_stop_encoder_feed());
assert_eq!(
session.encoder.encoded_window_count(),
encoder_windows_before_drain + 1,
"bridge drain MUST have committed exactly one full window"
);
assert_eq!(
session.decoder.call_count(),
decoder_calls_after_initial + 1,
"bridge-drained window MUST drive run_decode_pass in this call"
);
}
#[test]
fn session_retry_state_bridge_drain_no_windows_clears_bridge_no_obligation() {
let encoder = ScriptedEncoder::new(8, vec![]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5])]);
let mut session = nonfinalize_session(encoder, decoder);
let staged_mel = Array::from_slice::<f32>(&[0.0_f32; 2 * 8], &[2_i32, 8_i32]).unwrap();
session
.retry_state_mut()
.stage_stop_encoder_feed(staged_mel);
let _events = session
.feed_audio(&[])
.expect("empty feed_audio with non-completing drain MUST succeed");
assert!(!session.retry_state().has_pending_stop_encoder_feed());
assert!(
!session.retry_state().has_decode_owed(),
"drain with 0 windows MUST NOT arm DecodeOwed"
);
assert_eq!(session.encoder.encoded_window_count(), 0);
}
#[test]
fn session_retry_state_drained_count_survives_post_drain_err() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, true, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5]), Ok(vec![42])]);
let mut session = finalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let decoder_calls_after_initial = session.decoder.call_count();
let _ = session.stop();
assert!(session.retry_state().has_pending_stop_encoder_feed());
let samples_b: Vec<f32> = (0..1500)
.map(|i| ((i as f32 + 11.0) * 0.013).cos())
.collect();
let feed_err = session.feed_audio(&samples_b);
assert!(
feed_err.is_err(),
"feed_audio with new-audio encode_window Err MUST propagate Err"
);
assert!(!session.retry_state().has_pending_stop_encoder_feed());
session.last_decode_time = None;
let retry_events = session
.feed_audio(&[])
.expect("retry feed_audio MUST succeed");
assert!(
!session.retry_state().has_decode_owed(),
"successful retry decode MUST clear DecodeOwed"
);
assert!(
session.decoder.call_count() > decoder_calls_after_initial,
"retry MUST drive at least one decoder call"
);
let _ = retry_events;
}
#[test]
fn session_retry_state_throttled_drain_does_not_force_decode_on_next_call() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5]), Ok(vec![6, 7, 8])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let _ = session.stop();
assert!(session.retry_state().has_pending_stop_encoder_feed());
session.last_decode_time = None;
let _events = session
.feed_audio(&[])
.expect("happy-path discharge MUST succeed");
assert!(
!session.retry_state().has_obligation(),
"end-of-call MUST clear all obligations on happy path"
);
let decoder_calls_after_discharge = session.decoder.call_count();
let events_2 = session
.feed_audio(&[])
.expect("second empty feed_audio MUST succeed without forced decode");
assert_eq!(
session.decoder.call_count(),
decoder_calls_after_discharge,
"second empty feed_audio MUST NOT trigger a phantom decode"
);
assert!(events_2.is_empty(), "no work ⇒ no events");
}
#[test]
fn session_retry_state_cancel_clears_all_obligations() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, true, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5])]);
let mut session = finalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let _ = session.stop(); assert!(session.retry_state().has_obligation());
session.cancel();
assert!(
!session.retry_state().has_obligation(),
"cancel MUST clear all retry obligations atomically"
);
}
#[test]
fn session_retry_state_reset_clears_all_obligations() {
let encoder = ScriptedEncoder::new(8, vec![false, true, false, true, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![5])]);
let mut session = finalize_session(encoder, decoder);
let samples_a: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples_a).unwrap();
let _ = session.stop();
assert!(session.retry_state().has_obligation());
session.reset();
assert!(
!session.retry_state().has_obligation(),
"reset MUST clear all retry obligations atomically"
);
}
#[test]
fn session_retry_state_stop_after_flush_err_keeps_session_active_with_obligation() {
let encoder = ScriptedEncoder::new(8, vec![false, false, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![1]), Ok(vec![2, 3])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
assert!(session.mel_processor.overlap_buffer_len() > 0);
session.mel_processor.flush_err_inject_count = 1;
let stop_first = session.stop();
assert!(stop_first.is_err(), "stop() must propagate mel.flush Err");
assert!(
session.is_active(),
"errored stop() MUST leave session active so retry is possible"
);
assert!(
session.retry_state().has_pending_stop_mel_flush(),
"stop()'s mel.flush Err MUST stage StopMelFlush"
);
assert!(
session.retry_state().has_obligation(),
"retry obligation MUST be visible to the next call"
);
assert!(
session.mel_processor.overlap_buffer_len() > 0,
"transactional flush MUST preserve overlap on Err"
);
}
#[test]
fn session_retry_state_stop_with_mel_flush_err_can_be_retried() {
let encoder = ScriptedEncoder::new(8, vec![false, false, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![1]), Ok(vec![2, 3])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
let overlap_before = session.mel_processor.overlap_buffer_len();
assert!(overlap_before > 0);
session.mel_processor.flush_err_inject_count = 1;
let stop_first = session.stop();
assert!(stop_first.is_err());
assert!(session.retry_state().has_pending_stop_mel_flush());
assert_eq!(session.mel_processor.overlap_buffer_len(), overlap_before);
let stop_second = session
.stop()
.expect("retry stop() MUST succeed via discharge_stop_mel_flush");
assert!(
matches!(stop_second.last(), Some(TranscriptionEvent::Ended(_))),
"second stop() MUST emit Ended after the discharge succeeds"
);
assert!(!session.is_active());
assert!(!session.retry_state().has_obligation());
assert_eq!(session.mel_processor.overlap_buffer_len(), 0);
}
#[test]
fn session_retry_state_second_stop_after_flush_err_retries_and_emits_ended_on_success() {
let encoder = ScriptedEncoder::new(8, vec![false, false, false, false]);
let decoder = ScriptedDecoder::with_results(vec![Ok(vec![1]), Ok(vec![2, 3])]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
session.mel_processor.flush_err_inject_count = 2;
assert!(session.stop().is_err());
assert!(session.retry_state().has_pending_stop_mel_flush());
assert!(
session.stop().is_err(),
"second stop() with continued flush Err MUST propagate Err"
);
assert!(
session.retry_state().has_pending_stop_mel_flush(),
"re-armed StopMelFlush MUST persist across the second Err"
);
assert!(session.is_active());
let stop_third = session
.stop()
.expect("third stop() MUST succeed once flush stops erring");
assert!(matches!(
stop_third.last(),
Some(TranscriptionEvent::Ended(_))
));
assert!(!session.is_active());
assert!(!session.retry_state().has_obligation());
}
#[test]
fn session_retry_state_stop_partial_decode_clone_helper_propagates_errors_not_silently_drops_audio()
{
let none_out = clone_partial_decode_payload(None).expect("None must succeed");
assert!(
none_out.is_none(),
"None payload MUST round-trip as Ok(None) (no fabricated payload)"
);
let arr = Array::from_slice::<f32>(&[1.0_f32, 2.0, 3.0], &[3i32]).unwrap();
let some_out = clone_partial_decode_payload(Some(&arr)).expect("happy-path clone must succeed");
assert!(
some_out.is_some(),
"Some payload with successful clone MUST yield Ok(Some(_))"
);
let cloned = some_out.unwrap();
assert_eq!(
cloned.shape(),
arr.shape(),
"refcount clone preserves shape"
);
}
#[test]
fn session_retry_state_stop_partial_decode_fast_path_preserves_payload_on_success() {
let encoder = ScriptedEncoder::new(8, vec![false, false, false, false]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![1]),
Err(crate::error::Error::InvariantViolation(
crate::error::InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted stop-partial-decode Err",
),
)),
Ok(vec![1, 2]),
]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
let stop_first = session.stop();
assert!(
stop_first.is_err(),
"first stop() MUST propagate decoder Err"
);
assert!(
session.retry_state().has_pending_stop_partial_decode(),
"stop()'s partial-decode Err MUST arm StopPartialDecode \
(with the cloned audio_features payload, never silently None)"
);
let stop_second = session.stop().expect("retry stop() fast path MUST succeed");
assert!(matches!(
stop_second.last(),
Some(TranscriptionEvent::Ended(_))
));
assert!(!session.retry_state().has_obligation());
assert!(!session.is_active());
}
#[test]
fn session_retry_state_stop_partial_decode_mainline_body_arms_with_real_clone() {
let encoder = ScriptedEncoder::new(8, vec![false, false, false, false]);
let decoder = ScriptedDecoder::with_results(vec![
Ok(vec![1]),
Err(crate::error::Error::InvariantViolation(
crate::error::InvariantViolationPayload::new(
"ScriptedDecoder",
"scripted stop-partial-decode Err",
),
)),
]);
let mut session = nonfinalize_session(encoder, decoder);
let samples: Vec<f32> = (0..1200).map(|i| (i as f32 * 0.001).sin()).collect();
let _ = session.feed_audio(&samples).unwrap();
let stop_first = session.stop();
assert!(stop_first.is_err());
assert!(
session.retry_state().has_pending_stop_partial_decode(),
"mainline-body Err must arm StopPartialDecode"
);
let taken = session
.retry_state_mut()
.take_stop_partial_decode_features()
.expect("guard above asserts arm");
assert!(
taken.is_some(),
"mainline arm MUST carry the real cloned payload, never \
silently None — the encoder had a partial window (1-row carry \
after the 7-mel feed + stop-flush bridge), so encode_pending \
returned Some, so the helper's Ok arm is Some"
);
session.retry_state_mut().arm_stop_partial_decode(taken);
}