use crate::ir::Envelope;
use std::collections::VecDeque;
#[derive(Debug, Clone)]
pub enum StreamManagerError {
BufferOverflow(String),
InvalidChunkSize(String),
StreamClosed(String),
InvalidState(String),
}
impl std::fmt::Display for StreamManagerError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
StreamManagerError::BufferOverflow(msg) => {
write!(f, "Buffer overflow: {}", msg)
}
StreamManagerError::InvalidChunkSize(msg) => {
write!(f, "Invalid chunk size: {}", msg)
}
StreamManagerError::StreamClosed(msg) => {
write!(f, "Stream closed: {}", msg)
}
StreamManagerError::InvalidState(msg) => {
write!(f, "Invalid state: {}", msg)
}
}
}
}
impl std::error::Error for StreamManagerError {}
pub type StreamResult<T> = Result<T, StreamManagerError>;
#[derive(Debug, Clone)]
pub struct StreamManagerConfig {
pub max_buffer_size: usize,
pub chunk_size: usize,
pub enable_backpressure: bool,
}
impl Default for StreamManagerConfig {
fn default() -> Self {
Self {
max_buffer_size: 100,
chunk_size: 1024,
enable_backpressure: true,
}
}
}
#[derive(Debug, Clone)]
pub struct StreamChunk {
pub sequence: u64,
pub data: Envelope,
pub is_last: bool,
pub timestamp_ms: u64,
}
#[derive(Debug)]
struct StreamBuffer {
chunks: VecDeque<StreamChunk>,
max_size: usize,
next_sequence: u64,
closed: bool,
}
impl StreamBuffer {
fn new(max_size: usize) -> Self {
Self {
chunks: VecDeque::new(),
max_size,
next_sequence: 0,
closed: false,
}
}
fn push(&mut self, chunk: StreamChunk) -> StreamResult<()> {
if self.closed {
return Err(StreamManagerError::StreamClosed(
"Cannot push to closed stream".to_string(),
));
}
if self.chunks.len() >= self.max_size {
return Err(StreamManagerError::BufferOverflow(format!(
"Buffer full: {} chunks",
self.max_size
)));
}
self.chunks.push_back(chunk);
Ok(())
}
fn pop(&mut self) -> Option<StreamChunk> {
self.chunks.pop_front()
}
fn peek(&self) -> Option<&StreamChunk> {
self.chunks.front()
}
fn len(&self) -> usize {
self.chunks.len()
}
fn is_empty(&self) -> bool {
self.chunks.is_empty()
}
fn is_full(&self) -> bool {
self.chunks.len() >= self.max_size
}
fn close(&mut self) {
self.closed = true;
}
fn is_closed(&self) -> bool {
self.closed
}
}
pub struct StreamManager {
config: StreamManagerConfig,
input_buffer: StreamBuffer,
output_buffer: StreamBuffer,
}
impl StreamManager {
pub fn new() -> Self {
Self::with_config(StreamManagerConfig::default())
}
pub fn with_config(config: StreamManagerConfig) -> Self {
Self {
input_buffer: StreamBuffer::new(config.max_buffer_size),
output_buffer: StreamBuffer::new(config.max_buffer_size),
config,
}
}
pub fn push_input_chunk(&mut self, envelope: Envelope, is_last: bool) -> StreamResult<()> {
let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let sequence = self.input_buffer.next_sequence;
self.input_buffer.next_sequence += 1;
let chunk = StreamChunk {
sequence,
data: envelope,
is_last,
timestamp_ms,
};
self.input_buffer.push(chunk)
}
pub fn pop_input_chunk(&mut self) -> Option<StreamChunk> {
self.input_buffer.pop()
}
pub fn peek_input_chunk(&self) -> Option<&StreamChunk> {
self.input_buffer.peek()
}
pub fn push_output_chunk(&mut self, envelope: Envelope, is_last: bool) -> StreamResult<()> {
let timestamp_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as u64;
let sequence = self.output_buffer.next_sequence;
self.output_buffer.next_sequence += 1;
let chunk = StreamChunk {
sequence,
data: envelope,
is_last,
timestamp_ms,
};
self.output_buffer.push(chunk)
}
pub fn pop_output_chunk(&mut self) -> Option<StreamChunk> {
self.output_buffer.pop()
}
pub fn is_input_buffer_full(&self) -> bool {
self.input_buffer.is_full()
}
pub fn is_output_buffer_full(&self) -> bool {
self.output_buffer.is_full()
}
pub fn input_buffer_size(&self) -> usize {
self.input_buffer.len()
}
pub fn output_buffer_size(&self) -> usize {
self.output_buffer.len()
}
pub fn is_input_empty(&self) -> bool {
self.input_buffer.is_empty()
}
pub fn is_output_empty(&self) -> bool {
self.output_buffer.is_empty()
}
pub fn close_input(&mut self) {
self.input_buffer.close();
}
pub fn close_output(&mut self) {
self.output_buffer.close();
}
pub fn is_input_closed(&self) -> bool {
self.input_buffer.is_closed()
}
pub fn is_output_closed(&self) -> bool {
self.output_buffer.is_closed()
}
pub fn chunk_envelope(&self, envelope: &Envelope) -> Vec<Envelope> {
vec![envelope.clone()]
}
pub fn aggregate_chunks(&self, chunks: &[StreamChunk]) -> Option<Envelope> {
if chunks.is_empty() {
return None;
}
Some(chunks.last().unwrap().data.clone())
}
pub fn clear_buffers(&mut self) {
self.input_buffer = StreamBuffer::new(self.config.max_buffer_size);
self.output_buffer = StreamBuffer::new(self.config.max_buffer_size);
}
pub fn config(&self) -> &StreamManagerConfig {
&self.config
}
}
impl Default for StreamManager {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::ir::EnvelopeKind;
fn audio_envelope(bytes: &[u8]) -> Envelope {
Envelope::new(EnvelopeKind::Audio(bytes.to_vec()))
}
fn text_envelope(value: &str) -> Envelope {
Envelope::new(EnvelopeKind::Text(value.to_string()))
}
#[test]
fn test_stream_manager_creation() {
let manager = StreamManager::new();
assert_eq!(manager.input_buffer_size(), 0);
assert_eq!(manager.output_buffer_size(), 0);
}
#[test]
fn test_push_and_pop_input() {
let mut manager = StreamManager::new();
let envelope = audio_envelope(&[0, 1, 2]);
manager.push_input_chunk(envelope.clone(), false).unwrap();
assert_eq!(manager.input_buffer_size(), 1);
let chunk = manager.pop_input_chunk().unwrap();
assert!(matches!(chunk.data.kind, EnvelopeKind::Audio(_)));
assert!(!chunk.is_last);
assert_eq!(manager.input_buffer_size(), 0);
}
#[test]
fn test_push_and_pop_output() {
let mut manager = StreamManager::new();
let envelope = text_envelope("Text");
manager.push_output_chunk(envelope.clone(), true).unwrap();
assert_eq!(manager.output_buffer_size(), 1);
let chunk = manager.pop_output_chunk().unwrap();
assert_eq!(chunk.data.kind, EnvelopeKind::Text("Text".to_string()));
assert!(chunk.is_last);
}
#[test]
fn test_buffer_overflow() {
let config = StreamManagerConfig {
max_buffer_size: 2,
..Default::default()
};
let mut manager = StreamManager::with_config(config);
let envelope = text_envelope("test");
manager.push_input_chunk(envelope.clone(), false).unwrap();
manager.push_input_chunk(envelope.clone(), false).unwrap();
assert!(manager.push_input_chunk(envelope.clone(), false).is_err());
}
#[test]
fn test_peek_input() {
let mut manager = StreamManager::new();
let envelope = text_envelope("test");
manager.push_input_chunk(envelope.clone(), false).unwrap();
let peeked = manager.peek_input_chunk().unwrap();
assert_eq!(peeked.data.kind, EnvelopeKind::Text("test".to_string()));
assert_eq!(manager.input_buffer_size(), 1);
}
#[test]
fn test_close_stream() {
let mut manager = StreamManager::new();
let envelope = text_envelope("test");
manager.close_input();
assert!(manager.is_input_closed());
assert!(manager.push_input_chunk(envelope, false).is_err());
}
#[test]
fn test_chunk_envelope() {
let manager = StreamManager::new();
let envelope = audio_envelope(&[0, 1, 2]);
let chunks = manager.chunk_envelope(&envelope);
assert_eq!(chunks.len(), 1);
assert!(matches!(chunks[0].kind, EnvelopeKind::Audio(_)));
}
#[test]
fn test_aggregate_chunks() {
let manager = StreamManager::new();
let mut chunks = Vec::new();
for i in 0..3 {
chunks.push(StreamChunk {
sequence: i,
data: text_envelope(&format!("chunk_{}", i)),
is_last: i == 2,
timestamp_ms: 0,
});
}
let aggregated = manager.aggregate_chunks(&chunks).unwrap();
assert_eq!(aggregated.kind, EnvelopeKind::Text("chunk_2".to_string())); }
#[test]
fn test_clear_buffers() {
let mut manager = StreamManager::new();
let envelope = text_envelope("test");
manager.push_input_chunk(envelope.clone(), false).unwrap();
manager.push_output_chunk(envelope.clone(), false).unwrap();
manager.clear_buffers();
assert_eq!(manager.input_buffer_size(), 0);
assert_eq!(manager.output_buffer_size(), 0);
}
#[test]
fn test_sequence_numbers() {
let mut manager = StreamManager::new();
let envelope = text_envelope("test");
manager.push_input_chunk(envelope.clone(), false).unwrap();
let chunk1 = manager.pop_input_chunk().unwrap();
assert_eq!(chunk1.sequence, 0);
manager.push_input_chunk(envelope.clone(), false).unwrap();
let chunk2 = manager.pop_input_chunk().unwrap();
assert_eq!(chunk2.sequence, 1);
}
#[test]
fn test_is_last_flag() {
let mut manager = StreamManager::new();
let envelope = text_envelope("test");
manager.push_input_chunk(envelope.clone(), false).unwrap();
let chunk1 = manager.pop_input_chunk().unwrap();
assert!(!chunk1.is_last);
manager.push_input_chunk(envelope.clone(), true).unwrap();
let chunk2 = manager.pop_input_chunk().unwrap();
assert!(chunk2.is_last);
}
#[test]
fn test_buffer_size_tracking() {
let mut manager = StreamManager::new();
assert!(manager.is_input_empty());
assert!(manager.is_output_empty());
let envelope = text_envelope("test");
manager.push_input_chunk(envelope.clone(), false).unwrap();
assert!(!manager.is_input_empty());
assert_eq!(manager.input_buffer_size(), 1);
manager.push_output_chunk(envelope, false).unwrap();
assert!(!manager.is_output_empty());
assert_eq!(manager.output_buffer_size(), 1);
}
}