Skip to main content

ai_agents_runtime/optimization/
streaming.rs

1use ai_agents_core::{AgentError, Result};
2
3use super::response::MainResponseDraft;
4use crate::StreamChunk;
5
6/// Fully buffered stream branch output before the branch commits.
7#[derive(Debug, Clone)]
8pub struct StreamingDraftResult {
9    pub draft: MainResponseDraft,
10    pub chunks: Vec<StreamChunk>,
11}
12
13impl StreamingDraftResult {
14    pub fn new(draft: MainResponseDraft, chunks: Vec<StreamChunk>) -> Self {
15        Self { draft, chunks }
16    }
17}
18
19/// Bounded buffer used while streaming output waits for routing to resolve.
20#[derive(Debug, Clone)]
21pub struct StreamBranchBuffer {
22    max_chunks: usize,
23    chunks: Vec<StreamChunk>,
24}
25
26impl StreamBranchBuffer {
27    pub fn new(max_chunks: usize) -> Result<Self> {
28        if max_chunks == 0 {
29            return Err(AgentError::InvalidSpec(
30                "streaming.buffer_size must be greater than 0 for buffered routing".into(),
31            ));
32        }
33        Ok(Self {
34            max_chunks,
35            chunks: Vec::new(),
36        })
37    }
38
39    pub fn push(&mut self, chunk: StreamChunk) -> Result<()> {
40        if self.chunks.len() >= self.max_chunks {
41            return Err(AgentError::Other(format!(
42                "stream buffer filled before routing resolved (limit {})",
43                self.max_chunks
44            )));
45        }
46        self.chunks.push(chunk);
47        Ok(())
48    }
49
50    pub fn drain(self) -> Vec<StreamChunk> {
51        self.chunks
52    }
53
54    pub fn is_full(&self) -> bool {
55        self.chunks.len() >= self.max_chunks
56    }
57
58    pub fn discard(self) {}
59}
60
61#[cfg(test)]
62mod tests {
63    use super::*;
64
65    #[test]
66    fn buffer_rejects_zero_capacity() {
67        assert!(StreamBranchBuffer::new(0).is_err());
68    }
69
70    #[test]
71    fn buffer_reports_full_capacity() {
72        let mut buffer = StreamBranchBuffer::new(1).unwrap();
73        buffer.push(StreamChunk::content("a")).unwrap();
74        assert!(buffer.push(StreamChunk::content("b")).is_err());
75    }
76}