use crate::episode::ExecutionStep;
use anyhow::Result;
use std::time::{Duration, Instant};
use tracing::{debug, trace};
use uuid::Uuid;
use super::config::BatchConfig;
#[derive(Debug)]
pub struct StepBuffer {
episode_id: Uuid,
pub(super) steps: Vec<ExecutionStep>,
config: BatchConfig,
pub(super) last_flush: Instant,
total_steps_processed: usize,
}
impl StepBuffer {
pub fn new(episode_id: Uuid, config: BatchConfig) -> Self {
debug!(
episode_id = %episode_id,
max_batch_size = config.max_batch_size,
flush_interval_ms = config.flush_interval_ms,
auto_flush = config.auto_flush,
"Created new step buffer"
);
Self {
episode_id,
steps: Vec::new(),
config,
last_flush: Instant::now(),
total_steps_processed: 0,
}
}
pub fn add_step(&mut self, step: ExecutionStep) -> Result<()> {
trace!(
episode_id = %self.episode_id,
step_number = step.step_number,
buffer_size = self.steps.len(),
"Adding step to buffer"
);
self.steps.push(step);
self.total_steps_processed += 1;
Ok(())
}
pub fn should_flush(&self) -> bool {
if self.steps.is_empty() {
return false;
}
if !self.config.auto_flush {
return false;
}
if self.steps.len() >= self.config.max_batch_size {
debug!(
episode_id = %self.episode_id,
buffer_size = self.steps.len(),
max_batch_size = self.config.max_batch_size,
"Buffer size threshold reached"
);
return true;
}
let elapsed = self.last_flush.elapsed();
let flush_interval = Duration::from_millis(self.config.flush_interval_ms);
if elapsed >= flush_interval {
debug!(
episode_id = %self.episode_id,
elapsed_ms = elapsed.as_millis(),
flush_interval_ms = self.config.flush_interval_ms,
"Buffer time threshold reached"
);
return true;
}
false
}
#[must_use]
pub fn episode_id(&self) -> Uuid {
self.episode_id
}
#[must_use]
pub fn len(&self) -> usize {
self.steps.len()
}
#[must_use]
pub fn is_empty(&self) -> bool {
self.steps.is_empty()
}
#[must_use]
pub fn total_steps_processed(&self) -> usize {
self.total_steps_processed
}
#[must_use]
pub fn time_since_last_flush(&self) -> Duration {
self.last_flush.elapsed()
}
pub(super) fn steps_mut(&mut self) -> &mut Vec<ExecutionStep> {
&mut self.steps
}
pub(super) fn last_flush_mut(&mut self) -> &mut Instant {
&mut self.last_flush
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_step_buffer_creation() {
let episode_id = Uuid::new_v4();
let buffer = StepBuffer::new(episode_id, BatchConfig::default());
assert_eq!(buffer.episode_id(), episode_id);
assert!(buffer.is_empty());
assert_eq!(buffer.len(), 0);
assert_eq!(buffer.total_steps_processed(), 0);
}
#[test]
fn test_add_steps() {
let episode_id = Uuid::new_v4();
let mut buffer = StepBuffer::new(episode_id, BatchConfig::default());
let step1 = ExecutionStep::new(1, "tool_a".to_string(), "Action 1".to_string());
let step2 = ExecutionStep::new(2, "tool_b".to_string(), "Action 2".to_string());
buffer.add_step(step1).unwrap();
assert_eq!(buffer.len(), 1);
assert_eq!(buffer.total_steps_processed(), 1);
buffer.add_step(step2).unwrap();
assert_eq!(buffer.len(), 2);
assert_eq!(buffer.total_steps_processed(), 2);
}
#[test]
fn test_should_flush_size_threshold() {
let episode_id = Uuid::new_v4();
let config = BatchConfig {
max_batch_size: 3,
flush_interval_ms: 10000,
auto_flush: true,
};
let mut buffer = StepBuffer::new(episode_id, config);
assert!(!buffer.should_flush());
buffer
.add_step(ExecutionStep::new(
1,
"tool".to_string(),
"action".to_string(),
))
.unwrap();
buffer
.add_step(ExecutionStep::new(
2,
"tool".to_string(),
"action".to_string(),
))
.unwrap();
assert!(!buffer.should_flush());
buffer
.add_step(ExecutionStep::new(
3,
"tool".to_string(),
"action".to_string(),
))
.unwrap();
assert!(buffer.should_flush());
}
#[test]
fn test_should_flush_time_threshold() {
let episode_id = Uuid::new_v4();
let config = BatchConfig {
max_batch_size: 100,
flush_interval_ms: 50, auto_flush: true,
};
let mut buffer = StepBuffer::new(episode_id, config);
buffer
.add_step(ExecutionStep::new(
1,
"tool".to_string(),
"action".to_string(),
))
.unwrap();
assert!(!buffer.should_flush());
std::thread::sleep(Duration::from_millis(60));
assert!(buffer.should_flush());
}
#[test]
fn test_should_flush_manual_only() {
let episode_id = Uuid::new_v4();
let config = BatchConfig::manual_only();
let mut buffer = StepBuffer::new(episode_id, config);
for i in 1..=10 {
buffer
.add_step(ExecutionStep::new(
i,
"tool".to_string(),
"action".to_string(),
))
.unwrap();
}
assert!(!buffer.should_flush());
}
}