use serde::{Deserialize, Serialize};
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
pub struct EpisodeId {
pub id: String,
pub project: Option<String>,
}
impl EpisodeId {
pub fn new(id: String) -> Self {
Self { id, project: None }
}
pub fn with_project(id: String, project: String) -> Self {
Self {
id,
project: Some(project),
}
}
pub fn to_subject_suffix(&self) -> String {
let mut hasher = DefaultHasher::new();
self.hash(&mut hasher);
let hash = hasher.finish();
let shard = hash % 16; format!("shard.{:02x}", shard)
}
}
pub struct ShardedSubjectBuilder {
base_subject: String,
episode_id: Option<EpisodeId>,
}
impl ShardedSubjectBuilder {
pub fn new(base_subject: String) -> Self {
Self {
base_subject,
episode_id: None,
}
}
pub fn with_episode(mut self, episode_id: EpisodeId) -> Self {
self.episode_id = Some(episode_id);
self
}
pub fn build(self) -> String {
match self.episode_id {
Some(episode) => {
format!("{}.{}", self.base_subject, episode.to_subject_suffix())
}
None => self.base_subject,
}
}
}
#[derive(Debug, Clone)]
pub struct OptimizedConsumerConfig {
pub name: String,
pub max_ack_pending: i64,
pub max_deliver: i64,
pub ack_wait: std::time::Duration,
pub batch_size: usize,
pub filter_subject: Option<String>,
pub flow_control: FlowControlConfig,
}
#[derive(Debug, Clone)]
pub struct FlowControlConfig {
pub idle_heartbeat: std::time::Duration,
pub max_waiting: i64,
pub enabled: bool,
}
impl Default for OptimizedConsumerConfig {
fn default() -> Self {
Self {
name: format!("consumer-{}", uuid::Uuid::new_v4()),
max_ack_pending: 1000, max_deliver: 3,
ack_wait: std::time::Duration::from_secs(60), batch_size: 50, filter_subject: None,
flow_control: FlowControlConfig {
idle_heartbeat: std::time::Duration::from_secs(5),
max_waiting: 512, enabled: true,
},
}
}
}
impl OptimizedConsumerConfig {
pub fn for_fs_read() -> Self {
Self {
name: "fs-read-consumer".to_string(),
max_ack_pending: 2000, max_deliver: 3,
ack_wait: std::time::Duration::from_secs(30), batch_size: 100, filter_subject: Some("smith.intents.vetted.fs.read.*".to_string()),
flow_control: FlowControlConfig {
idle_heartbeat: std::time::Duration::from_secs(2),
max_waiting: 1024,
enabled: true,
},
}
}
pub fn for_http_fetch() -> Self {
Self {
name: "http-fetch-consumer".to_string(),
max_ack_pending: 500, max_deliver: 5, ack_wait: std::time::Duration::from_secs(120), batch_size: 25, filter_subject: Some("smith.intents.vetted.http.fetch.*".to_string()),
flow_control: FlowControlConfig {
idle_heartbeat: std::time::Duration::from_secs(10),
max_waiting: 256,
enabled: true,
},
}
}
pub fn for_admission() -> Self {
Self {
name: "admission-consumer".to_string(),
max_ack_pending: 5000, max_deliver: 2, ack_wait: std::time::Duration::from_secs(5), batch_size: 200, filter_subject: Some("smith.intents.raw.*".to_string()),
flow_control: FlowControlConfig {
idle_heartbeat: std::time::Duration::from_secs(1),
max_waiting: 2048, enabled: true,
},
}
}
}
#[derive(Debug, Clone)]
pub struct BackpressureManager {
pub lag_threshold: u64,
pub pending_ack_threshold: i64,
pub response_actions: Vec<BackpressureAction>,
}
#[derive(Debug, Clone)]
pub enum BackpressureAction {
RouteToQuarantine,
ReduceBatchSize(usize),
ExtendAckWait(std::time::Duration),
AlertOps(String),
}
impl Default for BackpressureManager {
fn default() -> Self {
Self {
lag_threshold: 1000, pending_ack_threshold: 500, response_actions: vec![
BackpressureAction::RouteToQuarantine,
BackpressureAction::ReduceBatchSize(10),
BackpressureAction::AlertOps("High consumer lag detected".to_string()),
],
}
}
}
impl BackpressureManager {
pub fn should_apply_backpressure(&self, consumer_lag: u64, pending_acks: i64) -> bool {
consumer_lag > self.lag_threshold || pending_acks > self.pending_ack_threshold
}
pub fn generate_backpressure_response(
&self,
consumer_lag: u64,
pending_acks: i64,
) -> Vec<BackpressureAction> {
if self.should_apply_backpressure(consumer_lag, pending_acks) {
self.response_actions.clone()
} else {
vec![]
}
}
}
pub struct ConsumerOptimizer;
impl ConsumerOptimizer {
pub fn calculate_max_ack_pending(executor_concurrency: usize, capability: &str) -> i64 {
let base_multiplier = match capability {
"fs.read" => 10, "http.fetch" => 5, "admission" => 20, _ => 8, };
(executor_concurrency * base_multiplier) as i64
}
pub fn calculate_batch_size(avg_processing_time_ms: u64, capability: &str) -> usize {
let base_size = match capability {
"fs.read" => 100, "http.fetch" => 25, "admission" => 200, _ => 50, };
if avg_processing_time_ms < 10 {
base_size * 2 } else if avg_processing_time_ms > 1000 {
base_size / 2 } else {
base_size
}
}
pub fn optimize_consumer_config(
capability: &str,
executor_concurrency: usize,
avg_processing_time_ms: u64,
) -> OptimizedConsumerConfig {
let base_config = match capability {
"fs.read" => OptimizedConsumerConfig::for_fs_read(),
"http.fetch" => OptimizedConsumerConfig::for_http_fetch(),
"admission" => OptimizedConsumerConfig::for_admission(),
_ => OptimizedConsumerConfig::default(),
};
OptimizedConsumerConfig {
max_ack_pending: Self::calculate_max_ack_pending(executor_concurrency, capability),
batch_size: Self::calculate_batch_size(avg_processing_time_ms, capability),
..base_config
}
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_episode_id_sharding() {
let episode1 = EpisodeId::new("episode-123".to_string());
let episode2 = EpisodeId::new("episode-456".to_string());
let suffix1 = episode1.to_subject_suffix();
let suffix2 = episode2.to_subject_suffix();
assert_ne!(suffix1, suffix2);
let episode1_duplicate = EpisodeId::new("episode-123".to_string());
assert_eq!(suffix1, episode1_duplicate.to_subject_suffix());
}
#[test]
fn test_sharded_subject_builder() {
let episode = EpisodeId::new("test-episode".to_string());
let subject = ShardedSubjectBuilder::new("smith.intents.vetted.fs.read.v1".to_string())
.with_episode(episode.clone())
.build();
assert!(subject.starts_with("smith.intents.vetted.fs.read.v1.shard."));
assert!(subject.contains(&episode.to_subject_suffix()));
}
#[test]
fn test_consumer_config_optimization() {
let fs_read_config = OptimizedConsumerConfig::for_fs_read();
assert_eq!(fs_read_config.batch_size, 100);
assert_eq!(fs_read_config.max_ack_pending, 2000);
let http_fetch_config = OptimizedConsumerConfig::for_http_fetch();
assert_eq!(http_fetch_config.batch_size, 25);
assert_eq!(http_fetch_config.max_deliver, 5);
let admission_config = OptimizedConsumerConfig::for_admission();
assert_eq!(admission_config.batch_size, 200);
assert_eq!(admission_config.max_ack_pending, 5000);
}
#[test]
fn test_backpressure_manager() {
let manager = BackpressureManager::default();
assert!(!manager.should_apply_backpressure(100, 50));
assert!(manager.should_apply_backpressure(1500, 50));
assert!(manager.should_apply_backpressure(100, 600));
assert!(manager.should_apply_backpressure(1500, 600));
}
#[test]
fn test_consumer_optimizer() {
let max_ack_pending = ConsumerOptimizer::calculate_max_ack_pending(10, "fs.read");
assert_eq!(max_ack_pending, 100);
let max_ack_pending = ConsumerOptimizer::calculate_max_ack_pending(8, "http.fetch");
assert_eq!(max_ack_pending, 40);
let batch_size = ConsumerOptimizer::calculate_batch_size(5, "fs.read"); assert_eq!(batch_size, 200);
let batch_size = ConsumerOptimizer::calculate_batch_size(2000, "http.fetch"); assert_eq!(batch_size, 12);
let config = ConsumerOptimizer::optimize_consumer_config("fs.read", 16, 8);
assert_eq!(config.max_ack_pending, 160); assert_eq!(config.batch_size, 200); }
}