1use serde::{Deserialize, Serialize};
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9
10#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
12pub struct EpisodeId {
13 pub id: String,
15 pub project: Option<String>,
17}
18
19impl EpisodeId {
20 pub fn new(id: String) -> Self {
21 Self { id, project: None }
22 }
23
24 pub fn with_project(id: String, project: String) -> Self {
25 Self {
26 id,
27 project: Some(project),
28 }
29 }
30
31 pub fn to_subject_suffix(&self) -> String {
33 let mut hasher = DefaultHasher::new();
34 self.hash(&mut hasher);
35 let hash = hasher.finish();
36
37 let shard = hash % 16; format!("shard.{:02x}", shard)
40 }
41}
42
43pub struct ShardedSubjectBuilder {
45 base_subject: String,
46 episode_id: Option<EpisodeId>,
47}
48
49impl ShardedSubjectBuilder {
50 pub fn new(base_subject: String) -> Self {
51 Self {
52 base_subject,
53 episode_id: None,
54 }
55 }
56
57 pub fn with_episode(mut self, episode_id: EpisodeId) -> Self {
58 self.episode_id = Some(episode_id);
59 self
60 }
61
62 pub fn build(self) -> String {
64 match self.episode_id {
65 Some(episode) => {
66 format!("{}.{}", self.base_subject, episode.to_subject_suffix())
67 }
68 None => self.base_subject,
69 }
70 }
71}
72
73#[derive(Debug, Clone)]
75pub struct OptimizedConsumerConfig {
76 pub name: String,
78
79 pub max_ack_pending: i64,
81
82 pub max_deliver: i64,
84
85 pub ack_wait: std::time::Duration,
87
88 pub batch_size: usize,
90
91 pub filter_subject: Option<String>,
93
94 pub flow_control: FlowControlConfig,
96}
97
98#[derive(Debug, Clone)]
99pub struct FlowControlConfig {
100 pub idle_heartbeat: std::time::Duration,
102
103 pub max_waiting: i64,
105
106 pub enabled: bool,
108}
109
110impl Default for OptimizedConsumerConfig {
111 fn default() -> Self {
112 Self {
113 name: format!("consumer-{}", uuid::Uuid::new_v4()),
114 max_ack_pending: 1000, max_deliver: 3,
116 ack_wait: std::time::Duration::from_secs(60), batch_size: 50, filter_subject: None,
119 flow_control: FlowControlConfig {
120 idle_heartbeat: std::time::Duration::from_secs(5),
121 max_waiting: 512, enabled: true,
123 },
124 }
125 }
126}
127
128impl OptimizedConsumerConfig {
129 pub fn for_fs_read() -> Self {
131 Self {
132 name: "fs-read-consumer".to_string(),
133 max_ack_pending: 2000, max_deliver: 3,
135 ack_wait: std::time::Duration::from_secs(30), batch_size: 100, filter_subject: Some("smith.intents.vetted.fs.read.*".to_string()),
138 flow_control: FlowControlConfig {
139 idle_heartbeat: std::time::Duration::from_secs(2),
140 max_waiting: 1024,
141 enabled: true,
142 },
143 }
144 }
145
146 pub fn for_http_fetch() -> Self {
148 Self {
149 name: "http-fetch-consumer".to_string(),
150 max_ack_pending: 500, max_deliver: 5, ack_wait: std::time::Duration::from_secs(120), batch_size: 25, filter_subject: Some("smith.intents.vetted.http.fetch.*".to_string()),
155 flow_control: FlowControlConfig {
156 idle_heartbeat: std::time::Duration::from_secs(10),
157 max_waiting: 256,
158 enabled: true,
159 },
160 }
161 }
162
163 pub fn for_admission() -> Self {
165 Self {
166 name: "admission-consumer".to_string(),
167 max_ack_pending: 5000, max_deliver: 2, ack_wait: std::time::Duration::from_secs(5), batch_size: 200, filter_subject: Some("smith.intents.raw.*".to_string()),
172 flow_control: FlowControlConfig {
173 idle_heartbeat: std::time::Duration::from_secs(1),
174 max_waiting: 2048, enabled: true,
176 },
177 }
178 }
179}
180
181#[derive(Debug, Clone)]
183pub struct BackpressureManager {
184 pub lag_threshold: u64,
186
187 pub pending_ack_threshold: i64,
189
190 pub response_actions: Vec<BackpressureAction>,
192}
193
194#[derive(Debug, Clone)]
195pub enum BackpressureAction {
196 RouteToQuarantine,
198
199 ReduceBatchSize(usize),
201
202 ExtendAckWait(std::time::Duration),
204
205 AlertOps(String),
207}
208
209impl Default for BackpressureManager {
210 fn default() -> Self {
211 Self {
212 lag_threshold: 1000, pending_ack_threshold: 500, response_actions: vec![
215 BackpressureAction::RouteToQuarantine,
216 BackpressureAction::ReduceBatchSize(10),
217 BackpressureAction::AlertOps("High consumer lag detected".to_string()),
218 ],
219 }
220 }
221}
222
223impl BackpressureManager {
224 pub fn should_apply_backpressure(&self, consumer_lag: u64, pending_acks: i64) -> bool {
226 consumer_lag > self.lag_threshold || pending_acks > self.pending_ack_threshold
227 }
228
229 pub fn generate_backpressure_response(
231 &self,
232 consumer_lag: u64,
233 pending_acks: i64,
234 ) -> Vec<BackpressureAction> {
235 if self.should_apply_backpressure(consumer_lag, pending_acks) {
236 self.response_actions.clone()
237 } else {
238 vec![]
239 }
240 }
241}
242
243pub struct ConsumerOptimizer;
245
246impl ConsumerOptimizer {
247 pub fn calculate_max_ack_pending(executor_concurrency: usize, capability: &str) -> i64 {
249 let base_multiplier = match capability {
250 "fs.read" => 10, "http.fetch" => 5, "admission" => 20, _ => 8, };
255
256 (executor_concurrency * base_multiplier) as i64
257 }
258
259 pub fn calculate_batch_size(avg_processing_time_ms: u64, capability: &str) -> usize {
261 let base_size = match capability {
262 "fs.read" => 100, "http.fetch" => 25, "admission" => 200, _ => 50, };
267
268 if avg_processing_time_ms < 10 {
270 base_size * 2 } else if avg_processing_time_ms > 1000 {
272 base_size / 2 } else {
274 base_size
275 }
276 }
277
278 pub fn optimize_consumer_config(
280 capability: &str,
281 executor_concurrency: usize,
282 avg_processing_time_ms: u64,
283 ) -> OptimizedConsumerConfig {
284 let base_config = match capability {
285 "fs.read" => OptimizedConsumerConfig::for_fs_read(),
286 "http.fetch" => OptimizedConsumerConfig::for_http_fetch(),
287 "admission" => OptimizedConsumerConfig::for_admission(),
288 _ => OptimizedConsumerConfig::default(),
289 };
290
291 OptimizedConsumerConfig {
292 max_ack_pending: Self::calculate_max_ack_pending(executor_concurrency, capability),
293 batch_size: Self::calculate_batch_size(avg_processing_time_ms, capability),
294 ..base_config
295 }
296 }
297}
298
299#[cfg(test)]
300mod tests {
301 use super::*;
302
303 #[test]
304 fn test_episode_id_sharding() {
305 let episode1 = EpisodeId::new("episode-123".to_string());
306 let episode2 = EpisodeId::new("episode-456".to_string());
307
308 let suffix1 = episode1.to_subject_suffix();
309 let suffix2 = episode2.to_subject_suffix();
310
311 assert_ne!(suffix1, suffix2);
313
314 let episode1_duplicate = EpisodeId::new("episode-123".to_string());
316 assert_eq!(suffix1, episode1_duplicate.to_subject_suffix());
317 }
318
319 #[test]
320 fn test_sharded_subject_builder() {
321 let episode = EpisodeId::new("test-episode".to_string());
322 let subject = ShardedSubjectBuilder::new("smith.intents.vetted.fs.read.v1".to_string())
323 .with_episode(episode.clone())
324 .build();
325
326 assert!(subject.starts_with("smith.intents.vetted.fs.read.v1.shard."));
327 assert!(subject.contains(&episode.to_subject_suffix()));
328 }
329
330 #[test]
331 fn test_consumer_config_optimization() {
332 let fs_read_config = OptimizedConsumerConfig::for_fs_read();
333 assert_eq!(fs_read_config.batch_size, 100);
334 assert_eq!(fs_read_config.max_ack_pending, 2000);
335
336 let http_fetch_config = OptimizedConsumerConfig::for_http_fetch();
337 assert_eq!(http_fetch_config.batch_size, 25);
338 assert_eq!(http_fetch_config.max_deliver, 5); let admission_config = OptimizedConsumerConfig::for_admission();
341 assert_eq!(admission_config.batch_size, 200);
342 assert_eq!(admission_config.max_ack_pending, 5000);
343 }
344
345 #[test]
346 fn test_backpressure_manager() {
347 let manager = BackpressureManager::default();
348
349 assert!(!manager.should_apply_backpressure(100, 50));
351
352 assert!(manager.should_apply_backpressure(1500, 50));
354
355 assert!(manager.should_apply_backpressure(100, 600));
357
358 assert!(manager.should_apply_backpressure(1500, 600));
360 }
361
362 #[test]
363 fn test_consumer_optimizer() {
364 let max_ack_pending = ConsumerOptimizer::calculate_max_ack_pending(10, "fs.read");
366 assert_eq!(max_ack_pending, 100); let max_ack_pending = ConsumerOptimizer::calculate_max_ack_pending(8, "http.fetch");
369 assert_eq!(max_ack_pending, 40); let batch_size = ConsumerOptimizer::calculate_batch_size(5, "fs.read"); assert_eq!(batch_size, 200); let batch_size = ConsumerOptimizer::calculate_batch_size(2000, "http.fetch"); assert_eq!(batch_size, 12); let config = ConsumerOptimizer::optimize_consumer_config("fs.read", 16, 8);
380 assert_eq!(config.max_ack_pending, 160); assert_eq!(config.batch_size, 200); }
383}