use crate::config::{FlushConfig, FlushStrategy};
use std::collections::VecDeque;
use std::time::{Duration, Instant};
const HISTORY_SIZE: usize = 20;
const FREQUENT_THRESHOLD: Duration = Duration::from_secs(120);
const DEADLINE_BUFFER: Duration = Duration::from_millis(500);
#[non_exhaustive]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum FlushReason {
Shutdown,
Deadline,
BufferFull,
Periodic,
InvocationEnd,
Continuous,
}
pub struct FlushManager {
config: FlushConfig,
invocation_timestamps: VecDeque<Instant>,
last_flush: Option<Instant>,
consecutive_timeout_count: usize,
}
impl FlushManager {
pub fn new(config: FlushConfig) -> Self {
Self {
config,
invocation_timestamps: VecDeque::with_capacity(HISTORY_SIZE),
last_flush: None,
consecutive_timeout_count: 0,
}
}
pub fn with_defaults() -> Self {
Self::new(FlushConfig::default())
}
pub fn record_invocation(&mut self) {
let now = Instant::now();
if self.invocation_timestamps.len() >= HISTORY_SIZE {
self.invocation_timestamps.pop_front();
}
self.invocation_timestamps.push_back(now);
}
pub fn record_flush(&mut self) {
self.last_flush = Some(Instant::now());
self.consecutive_timeout_count = 0;
}
pub fn record_flush_timeout(&mut self) {
self.consecutive_timeout_count += 1;
}
pub fn should_flush(
&self,
deadline_ms: Option<i64>,
pending_count: usize,
is_shutdown: bool,
) -> Option<FlushReason> {
if is_shutdown {
return Some(FlushReason::Shutdown);
}
if let Some(deadline) = deadline_ms
&& self.is_deadline_approaching(deadline)
{
return Some(FlushReason::Deadline);
}
if pending_count > 0 && self.is_buffer_full(pending_count) {
return Some(FlushReason::BufferFull);
}
if pending_count == 0 {
return None;
}
match self.effective_strategy() {
FlushStrategy::Continuous => {
if self.should_flush_continuous() {
return Some(FlushReason::Continuous);
}
}
FlushStrategy::Periodic => {
if self.should_flush_periodic() {
return Some(FlushReason::Periodic);
}
}
FlushStrategy::End => {
return None;
}
FlushStrategy::Default => {
if self.is_infrequent_pattern() {
return None;
} else if self.should_flush_continuous() {
return Some(FlushReason::Continuous);
}
}
}
None
}
pub fn should_flush_on_invocation_end(&self, pending_count: usize) -> Option<FlushReason> {
if pending_count == 0 {
return None;
}
match self.effective_strategy() {
FlushStrategy::End => Some(FlushReason::InvocationEnd),
FlushStrategy::Default if self.is_infrequent_pattern() => {
Some(FlushReason::InvocationEnd)
}
_ => None,
}
}
fn effective_strategy(&self) -> FlushStrategy {
if self.consecutive_timeout_count >= HISTORY_SIZE {
return FlushStrategy::Continuous;
}
self.config.strategy
}
pub fn average_invocation_interval(&self) -> Option<Duration> {
if self.invocation_timestamps.len() < 2 {
return None;
}
let first = self.invocation_timestamps.front()?;
let last = self.invocation_timestamps.back()?;
let total_duration = last.duration_since(*first);
let intervals = self.invocation_timestamps.len() - 1;
Some(total_duration / intervals as u32)
}
pub fn is_infrequent_pattern(&self) -> bool {
match self.average_invocation_interval() {
Some(avg) => avg > FREQUENT_THRESHOLD,
None => true,
}
}
fn is_deadline_approaching(&self, deadline_ms: i64) -> bool {
let now_ms = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64;
let remaining = deadline_ms - now_ms;
remaining < DEADLINE_BUFFER.as_millis() as i64
}
fn is_buffer_full(&self, pending_count: usize) -> bool {
pending_count >= self.config.max_batch_entries
}
fn should_flush_continuous(&self) -> bool {
match self.last_flush {
Some(last) => last.elapsed() >= self.config.interval,
None => true,
}
}
fn should_flush_periodic(&self) -> bool {
self.should_flush_continuous()
}
pub fn interval(&self) -> Duration {
self.config.interval
}
pub fn time_until_next_flush(&self) -> Duration {
match self.last_flush {
Some(last) => {
let elapsed = last.elapsed();
if elapsed >= self.config.interval {
Duration::ZERO
} else {
self.config.interval - elapsed
}
}
None => Duration::ZERO,
}
}
pub fn consecutive_timeout_count(&self) -> usize {
self.consecutive_timeout_count
}
pub fn invocation_history_len(&self) -> usize {
self.invocation_timestamps.len()
}
}
#[cfg(test)]
mod tests {
use super::*;
fn test_config() -> FlushConfig {
FlushConfig {
strategy: FlushStrategy::Default,
interval: Duration::from_millis(100),
max_batch_bytes: 1024,
max_batch_entries: 10,
}
}
#[test]
fn test_shutdown_always_flushes() {
let manager = FlushManager::new(test_config());
let reason = manager.should_flush(None, 1, true);
assert_eq!(reason, Some(FlushReason::Shutdown));
let reason = manager.should_flush(None, 0, true);
assert_eq!(reason, Some(FlushReason::Shutdown));
}
#[test]
fn test_buffer_full_triggers_flush() {
let mut config = test_config();
config.max_batch_entries = 5;
let manager = FlushManager::new(config);
let reason = manager.should_flush(None, 5, false);
assert_eq!(reason, Some(FlushReason::BufferFull));
let reason = manager.should_flush(None, 3, false);
assert_ne!(reason, Some(FlushReason::BufferFull));
}
#[test]
fn test_empty_buffer_no_flush() {
let manager = FlushManager::new(test_config());
let reason = manager.should_flush(None, 0, false);
assert!(reason.is_none());
}
#[test]
fn test_continuous_strategy() {
let mut config = test_config();
config.strategy = FlushStrategy::Continuous;
config.interval = Duration::from_millis(10);
let mut manager = FlushManager::new(config);
let reason = manager.should_flush(None, 1, false);
assert_eq!(reason, Some(FlushReason::Continuous));
manager.record_flush();
let reason = manager.should_flush(None, 1, false);
assert!(reason.is_none());
std::thread::sleep(Duration::from_millis(15));
let reason = manager.should_flush(None, 1, false);
assert_eq!(reason, Some(FlushReason::Continuous));
}
#[test]
fn test_end_strategy() {
let mut config = test_config();
config.strategy = FlushStrategy::End;
let manager = FlushManager::new(config);
let reason = manager.should_flush(None, 1, false);
assert!(reason.is_none());
let reason = manager.should_flush_on_invocation_end(1);
assert_eq!(reason, Some(FlushReason::InvocationEnd));
}
#[test]
fn test_infrequent_pattern_detection() {
let mut manager = FlushManager::new(test_config());
assert!(manager.is_infrequent_pattern());
manager.record_invocation();
assert!(manager.is_infrequent_pattern());
}
#[test]
fn test_frequent_pattern_detection() {
let mut manager = FlushManager::new(test_config());
for _ in 0..5 {
manager.record_invocation();
}
let avg = manager.average_invocation_interval();
assert!(avg.is_some());
assert!(!manager.is_infrequent_pattern());
}
#[test]
fn test_timeout_escalation() {
let mut config = test_config();
config.strategy = FlushStrategy::End;
let mut manager = FlushManager::new(config);
for _ in 0..HISTORY_SIZE {
manager.record_flush_timeout();
}
let reason = manager.should_flush(None, 1, false);
assert_eq!(reason, Some(FlushReason::Continuous));
}
#[test]
fn test_record_flush_resets_timeout_count() {
let mut manager = FlushManager::new(test_config());
manager.record_flush_timeout();
manager.record_flush_timeout();
assert_eq!(manager.consecutive_timeout_count(), 2);
manager.record_flush();
assert_eq!(manager.consecutive_timeout_count(), 0);
}
#[test]
fn test_time_until_next_flush() {
let mut config = test_config();
config.interval = Duration::from_millis(100);
let mut manager = FlushManager::new(config);
assert_eq!(manager.time_until_next_flush(), Duration::ZERO);
manager.record_flush();
let remaining = manager.time_until_next_flush();
assert!(remaining <= Duration::from_millis(100));
assert!(remaining > Duration::ZERO);
}
#[test]
fn test_invocation_history_limit() {
let mut manager = FlushManager::new(test_config());
for _ in 0..30 {
manager.record_invocation();
}
assert_eq!(manager.invocation_history_len(), HISTORY_SIZE);
}
}