Skip to main content

smith_bus/
sharding.rs

1//! Episode-based subject sharding for ordering guarantees
2//!
3//! This module implements subject-hash sharding to ensure ordering guarantees
4//! within episode boundaries while allowing parallel processing across episodes.
5
6use serde::{Deserialize, Serialize};
7use std::collections::hash_map::DefaultHasher;
8use std::hash::{Hash, Hasher};
9
10/// Episode identifier for sharding and ordering
11#[derive(Debug, Clone, Serialize, Deserialize, Hash, PartialEq, Eq)]
12pub struct EpisodeId {
13    /// Unique episode identifier
14    pub id: String,
15    /// Optional project context for isolation
16    pub project: Option<String>,
17}
18
19impl EpisodeId {
20    pub fn new(id: String) -> Self {
21        Self { id, project: None }
22    }
23
24    pub fn with_project(id: String, project: String) -> Self {
25        Self {
26            id,
27            project: Some(project),
28        }
29    }
30
31    /// Generate a subject suffix based on episode hash for sharding
32    pub fn to_subject_suffix(&self) -> String {
33        let mut hasher = DefaultHasher::new();
34        self.hash(&mut hasher);
35        let hash = hasher.finish();
36
37        // Use modulo to create consistent sharding
38        let shard = hash % 16; // 16 shards for good distribution
39        format!("shard.{:02x}", shard)
40    }
41}
42
43/// Subject builder with episode-based sharding support
44pub struct ShardedSubjectBuilder {
45    base_subject: String,
46    episode_id: Option<EpisodeId>,
47}
48
49impl ShardedSubjectBuilder {
50    pub fn new(base_subject: String) -> Self {
51        Self {
52            base_subject,
53            episode_id: None,
54        }
55    }
56
57    pub fn with_episode(mut self, episode_id: EpisodeId) -> Self {
58        self.episode_id = Some(episode_id);
59        self
60    }
61
62    /// Build the sharded subject
63    pub fn build(self) -> String {
64        match self.episode_id {
65            Some(episode) => {
66                format!("{}.{}", self.base_subject, episode.to_subject_suffix())
67            }
68            None => self.base_subject,
69        }
70    }
71}
72
73/// Consumer configuration optimized for Phase 2 performance requirements
74#[derive(Debug, Clone)]
75pub struct OptimizedConsumerConfig {
76    /// Consumer name
77    pub name: String,
78
79    /// Maximum messages to deliver without acknowledgment
80    pub max_ack_pending: i64,
81
82    /// Maximum delivery attempts before dead letter
83    pub max_deliver: i64,
84
85    /// Acknowledgment wait time
86    pub ack_wait: std::time::Duration,
87
88    /// Pull-based consumer batch size
89    pub batch_size: usize,
90
91    /// Consumer filter subject (supports sharding)
92    pub filter_subject: Option<String>,
93
94    /// Flow control settings
95    pub flow_control: FlowControlConfig,
96}
97
98#[derive(Debug, Clone)]
99pub struct FlowControlConfig {
100    /// Idle heartbeat interval
101    pub idle_heartbeat: std::time::Duration,
102
103    /// Maximum waiting time for messages
104    pub max_waiting: i64,
105
106    /// Enable flow control
107    pub enabled: bool,
108}
109
110impl Default for OptimizedConsumerConfig {
111    fn default() -> Self {
112        Self {
113            name: format!("consumer-{}", uuid::Uuid::new_v4()),
114            max_ack_pending: 1000, // Higher for throughput
115            max_deliver: 3,
116            ack_wait: std::time::Duration::from_secs(60), // Longer for complex processing
117            batch_size: 50,                               // Optimized batch size
118            filter_subject: None,
119            flow_control: FlowControlConfig {
120                idle_heartbeat: std::time::Duration::from_secs(5),
121                max_waiting: 512, // Higher concurrency
122                enabled: true,
123            },
124        }
125    }
126}
127
128impl OptimizedConsumerConfig {
129    /// Create consumer config optimized for fs.read capability
130    pub fn for_fs_read() -> Self {
131        Self {
132            name: "fs-read-consumer".to_string(),
133            max_ack_pending: 2000, // High throughput
134            max_deliver: 3,
135            ack_wait: std::time::Duration::from_secs(30), // Fast I/O operations
136            batch_size: 100,                              // Large batches for file operations
137            filter_subject: Some("smith.intents.vetted.fs.read.*".to_string()),
138            flow_control: FlowControlConfig {
139                idle_heartbeat: std::time::Duration::from_secs(2),
140                max_waiting: 1024,
141                enabled: true,
142            },
143        }
144    }
145
146    /// Create consumer config optimized for http.fetch capability  
147    pub fn for_http_fetch() -> Self {
148        Self {
149            name: "http-fetch-consumer".to_string(),
150            max_ack_pending: 500, // Moderate throughput for network I/O
151            max_deliver: 5,       // More retries for network failures
152            ack_wait: std::time::Duration::from_secs(120), // Longer for HTTP timeouts
153            batch_size: 25,       // Smaller batches for network operations
154            filter_subject: Some("smith.intents.vetted.http.fetch.*".to_string()),
155            flow_control: FlowControlConfig {
156                idle_heartbeat: std::time::Duration::from_secs(10),
157                max_waiting: 256,
158                enabled: true,
159            },
160        }
161    }
162
163    /// Create consumer config optimized for admission control
164    pub fn for_admission() -> Self {
165        Self {
166            name: "admission-consumer".to_string(),
167            max_ack_pending: 5000, // Very high throughput for policy checks
168            max_deliver: 2,        // Fast failures for admission
169            ack_wait: std::time::Duration::from_secs(5), // Very fast processing
170            batch_size: 200,       // Large batches for policy validation
171            filter_subject: Some("smith.intents.raw.*".to_string()),
172            flow_control: FlowControlConfig {
173                idle_heartbeat: std::time::Duration::from_secs(1),
174                max_waiting: 2048, // Very high concurrency
175                enabled: true,
176            },
177        }
178    }
179}
180
181/// Backpressure detection and management
182#[derive(Debug, Clone)]
183pub struct BackpressureManager {
184    /// Threshold for consumer lag (messages)
185    pub lag_threshold: u64,
186
187    /// Threshold for pending acks
188    pub pending_ack_threshold: i64,
189
190    /// Backpressure response actions
191    pub response_actions: Vec<BackpressureAction>,
192}
193
194#[derive(Debug, Clone)]
195pub enum BackpressureAction {
196    /// Route overload to quarantine stream
197    RouteToQuarantine,
198
199    /// Reduce batch sizes
200    ReduceBatchSize(usize),
201
202    /// Increase ack wait times
203    ExtendAckWait(std::time::Duration),
204
205    /// Alert operations team
206    AlertOps(String),
207}
208
209impl Default for BackpressureManager {
210    fn default() -> Self {
211        Self {
212            lag_threshold: 1000,        // 1000 messages behind
213            pending_ack_threshold: 500, // 500 unacked messages
214            response_actions: vec![
215                BackpressureAction::RouteToQuarantine,
216                BackpressureAction::ReduceBatchSize(10),
217                BackpressureAction::AlertOps("High consumer lag detected".to_string()),
218            ],
219        }
220    }
221}
222
223impl BackpressureManager {
224    /// Check if backpressure should be applied
225    pub fn should_apply_backpressure(&self, consumer_lag: u64, pending_acks: i64) -> bool {
226        consumer_lag > self.lag_threshold || pending_acks > self.pending_ack_threshold
227    }
228
229    /// Generate backpressure response
230    pub fn generate_backpressure_response(
231        &self,
232        consumer_lag: u64,
233        pending_acks: i64,
234    ) -> Vec<BackpressureAction> {
235        if self.should_apply_backpressure(consumer_lag, pending_acks) {
236            self.response_actions.clone()
237        } else {
238            vec![]
239        }
240    }
241}
242
243/// Performance optimization helpers for JetStream consumers
244pub struct ConsumerOptimizer;
245
246impl ConsumerOptimizer {
247    /// Calculate optimal MaxAckPending based on executor concurrency
248    pub fn calculate_max_ack_pending(executor_concurrency: usize, capability: &str) -> i64 {
249        let base_multiplier = match capability {
250            "fs.read" => 10,   // Fast I/O operations
251            "http.fetch" => 5, // Slower network operations
252            "admission" => 20, // Very fast policy checks
253            _ => 8,            // Default multiplier
254        };
255
256        (executor_concurrency * base_multiplier) as i64
257    }
258
259    /// Calculate optimal batch size based on message processing time
260    pub fn calculate_batch_size(avg_processing_time_ms: u64, capability: &str) -> usize {
261        let base_size = match capability {
262            "fs.read" => 100,   // Large batches for file I/O
263            "http.fetch" => 25, // Smaller batches for network
264            "admission" => 200, // Very large batches for fast processing
265            _ => 50,            // Default batch size
266        };
267
268        // Adjust based on processing time
269        if avg_processing_time_ms < 10 {
270            base_size * 2 // Very fast processing, larger batches
271        } else if avg_processing_time_ms > 1000 {
272            base_size / 2 // Slow processing, smaller batches
273        } else {
274            base_size
275        }
276    }
277
278    /// Generate optimized consumer configuration
279    pub fn optimize_consumer_config(
280        capability: &str,
281        executor_concurrency: usize,
282        avg_processing_time_ms: u64,
283    ) -> OptimizedConsumerConfig {
284        let base_config = match capability {
285            "fs.read" => OptimizedConsumerConfig::for_fs_read(),
286            "http.fetch" => OptimizedConsumerConfig::for_http_fetch(),
287            "admission" => OptimizedConsumerConfig::for_admission(),
288            _ => OptimizedConsumerConfig::default(),
289        };
290
291        OptimizedConsumerConfig {
292            max_ack_pending: Self::calculate_max_ack_pending(executor_concurrency, capability),
293            batch_size: Self::calculate_batch_size(avg_processing_time_ms, capability),
294            ..base_config
295        }
296    }
297}
298
299#[cfg(test)]
300mod tests {
301    use super::*;
302
303    #[test]
304    fn test_episode_id_sharding() {
305        let episode1 = EpisodeId::new("episode-123".to_string());
306        let episode2 = EpisodeId::new("episode-456".to_string());
307
308        let suffix1 = episode1.to_subject_suffix();
309        let suffix2 = episode2.to_subject_suffix();
310
311        // Different episodes should get different suffixes
312        assert_ne!(suffix1, suffix2);
313
314        // Same episode should get same suffix
315        let episode1_duplicate = EpisodeId::new("episode-123".to_string());
316        assert_eq!(suffix1, episode1_duplicate.to_subject_suffix());
317    }
318
319    #[test]
320    fn test_sharded_subject_builder() {
321        let episode = EpisodeId::new("test-episode".to_string());
322        let subject = ShardedSubjectBuilder::new("smith.intents.vetted.fs.read.v1".to_string())
323            .with_episode(episode.clone())
324            .build();
325
326        assert!(subject.starts_with("smith.intents.vetted.fs.read.v1.shard."));
327        assert!(subject.contains(&episode.to_subject_suffix()));
328    }
329
330    #[test]
331    fn test_consumer_config_optimization() {
332        let fs_read_config = OptimizedConsumerConfig::for_fs_read();
333        assert_eq!(fs_read_config.batch_size, 100);
334        assert_eq!(fs_read_config.max_ack_pending, 2000);
335
336        let http_fetch_config = OptimizedConsumerConfig::for_http_fetch();
337        assert_eq!(http_fetch_config.batch_size, 25);
338        assert_eq!(http_fetch_config.max_deliver, 5); // More retries for network
339
340        let admission_config = OptimizedConsumerConfig::for_admission();
341        assert_eq!(admission_config.batch_size, 200);
342        assert_eq!(admission_config.max_ack_pending, 5000);
343    }
344
345    #[test]
346    fn test_backpressure_manager() {
347        let manager = BackpressureManager::default();
348
349        // Normal conditions - no backpressure
350        assert!(!manager.should_apply_backpressure(100, 50));
351
352        // High lag - should apply backpressure
353        assert!(manager.should_apply_backpressure(1500, 50));
354
355        // High pending acks - should apply backpressure
356        assert!(manager.should_apply_backpressure(100, 600));
357
358        // Both high - should apply backpressure
359        assert!(manager.should_apply_backpressure(1500, 600));
360    }
361
362    #[test]
363    fn test_consumer_optimizer() {
364        // Test MaxAckPending calculation
365        let max_ack_pending = ConsumerOptimizer::calculate_max_ack_pending(10, "fs.read");
366        assert_eq!(max_ack_pending, 100); // 10 * 10
367
368        let max_ack_pending = ConsumerOptimizer::calculate_max_ack_pending(8, "http.fetch");
369        assert_eq!(max_ack_pending, 40); // 8 * 5
370
371        // Test batch size calculation
372        let batch_size = ConsumerOptimizer::calculate_batch_size(5, "fs.read"); // Very fast
373        assert_eq!(batch_size, 200); // 100 * 2
374
375        let batch_size = ConsumerOptimizer::calculate_batch_size(2000, "http.fetch"); // Slow
376        assert_eq!(batch_size, 12); // 25 / 2
377
378        // Test full optimization
379        let config = ConsumerOptimizer::optimize_consumer_config("fs.read", 16, 8);
380        assert_eq!(config.max_ack_pending, 160); // 16 * 10
381        assert_eq!(config.batch_size, 200); // Fast processing, larger batches
382    }
383}