Skip to main content

smith_bus/
streams.rs

1use anyhow::{Context, Result};
2use async_nats::jetstream::{self, stream::Config as StreamConfig};
3use std::time::Duration;
4use tracing::{debug, info, warn};
5
6/// Stream manager for creating and managing JetStream streams
7pub struct StreamManager {
8    jetstream: jetstream::Context,
9}
10
11impl StreamManager {
12    pub fn new(jetstream: jetstream::Context) -> Self {
13        Self { jetstream }
14    }
15
16    /// Ensure all required Smith streams exist with proper configuration
17    pub async fn bootstrap_streams(&self) -> Result<()> {
18        info!("Bootstrapping Smith JetStream streams with Phase 2 performance optimization");
19
20        // Create Phase 2 optimized stream architecture
21        self.ensure_sdlc_raw_stream()
22            .await
23            .context("Failed to ensure SDLC_RAW stream")?;
24
25        self.ensure_atoms_vetted_stream()
26            .await
27            .context("Failed to ensure ATOMS_VETTED stream")?;
28
29        self.ensure_atoms_results_stream()
30            .await
31            .context("Failed to ensure ATOMS_RESULTS stream")?;
32
33        self.ensure_audit_streams()
34            .await
35            .context("Failed to ensure AUDIT streams")?;
36
37        self.ensure_backpressure_streams()
38            .await
39            .context("Failed to ensure BACKPRESSURE streams")?;
40
41        info!("All Smith Phase 2 streams bootstrapped successfully");
42        Ok(())
43    }
44
45    /// Phase 2: Raw intent ingestion stream (sdlc.raw)
46    pub async fn ensure_sdlc_raw_stream(&self) -> Result<()> {
47        let stream_name = "SDLC_RAW";
48        let subjects = vec![
49            "smith.intents.raw.*".to_string(), // All raw intents
50        ];
51
52        let config = StreamConfig {
53            name: stream_name.to_string(),
54            description: Some(
55                "Phase 2: Raw intent ingestion with high-throughput optimization".to_string(),
56            ),
57            subjects,
58            retention: jetstream::stream::RetentionPolicy::WorkQueue, // Work queue semantics
59            max_age: Duration::from_secs(6 * 60 * 60), // 6 hours - shorter for raw processing
60            max_bytes: 500 * 1024 * 1024,              // 500MB for high throughput
61            max_messages: 50_000,                      // Higher message limit for load testing
62            max_message_size: 2 * 1024 * 1024,         // 2MB for complex intents
63            storage: jetstream::stream::StorageType::File,
64            num_replicas: 1,
65            discard: jetstream::stream::DiscardPolicy::Old,
66            duplicate_window: Duration::from_secs(60), // 1 minute deduplication
67            ..Default::default()
68        };
69
70        self.create_or_update_stream(config).await?;
71        info!("SDLC_RAW stream ensured with high-throughput configuration");
72        Ok(())
73    }
74
75    /// Phase 2: Policy-approved intents stream (atoms.vetted)
76    pub async fn ensure_atoms_vetted_stream(&self) -> Result<()> {
77        let stream_name = "ATOMS_VETTED";
78        let subjects = vec![
79            "smith.intents.vetted.*".to_string(), // Policy-approved intents
80        ];
81
82        let config = StreamConfig {
83            name: stream_name.to_string(),
84            description: Some(
85                "Phase 2: Policy-approved intents with ordering guarantees".to_string(),
86            ),
87            subjects,
88            retention: jetstream::stream::RetentionPolicy::Interest, // Keep until all consumers processed
89            max_age: Duration::from_secs(12 * 60 * 60),              // 12 hours retention
90            max_bytes: 1024 * 1024 * 1024,                           // 1GB for larger workloads
91            max_messages: 100_000, // Higher capacity for load testing
92            max_message_size: 2 * 1024 * 1024, // 2MB max
93            storage: jetstream::stream::StorageType::File,
94            num_replicas: 1,
95            discard: jetstream::stream::DiscardPolicy::Old,
96            duplicate_window: Duration::from_secs(2 * 60), // 2 minutes deduplication
97            ..Default::default()
98        };
99
100        self.create_or_update_stream(config).await?;
101        info!("ATOMS_VETTED stream ensured with ordering guarantees");
102        Ok(())
103    }
104
105    /// Phase 2: Execution results stream (atoms.results)
106    pub async fn ensure_atoms_results_stream(&self) -> Result<()> {
107        let stream_name = "ATOMS_RESULTS";
108        let subjects = vec![
109            "smith.results.*".to_string(), // All execution results
110        ];
111
112        let config = StreamConfig {
113            name: stream_name.to_string(),
114            description: Some("Phase 2: Execution results with performance tracking".to_string()),
115            subjects,
116            retention: jetstream::stream::RetentionPolicy::Limits, // Time/size limited retention
117            max_age: Duration::from_secs(48 * 60 * 60),            // 48 hours for analysis
118            max_bytes: 2048 * 1024 * 1024,                         // 2GB for comprehensive results
119            max_messages: 200_000, // High capacity for load testing results
120            max_message_size: 4 * 1024 * 1024, // 4MB for detailed execution results
121            storage: jetstream::stream::StorageType::File,
122            num_replicas: 1,
123            discard: jetstream::stream::DiscardPolicy::Old,
124            duplicate_window: Duration::from_secs(5 * 60), // 5 minutes for results deduplication
125            ..Default::default()
126        };
127
128        self.create_or_update_stream(config).await?;
129        info!("ATOMS_RESULTS stream ensured with performance tracking");
130        Ok(())
131    }
132
133    /// Phase 2: Audit and compliance streams
134    pub async fn ensure_audit_streams(&self) -> Result<()> {
135        // Security and compliance audit stream
136        let audit_config = StreamConfig {
137            name: "AUDIT_SECURITY".to_string(),
138            description: Some("Phase 2: Security and compliance audit events".to_string()),
139            subjects: vec!["smith.audit.*".to_string()],
140            retention: jetstream::stream::RetentionPolicy::Interest, // Permanent retention for compliance
141            max_age: Duration::from_secs(365 * 24 * 60 * 60),        // 1 year retention
142            max_bytes: 10 * 1024 * 1024 * 1024, // 10GB for comprehensive audit trail
143            max_messages: 1_000_000,            // High capacity for detailed auditing
144            max_message_size: 1024 * 1024,      // 1MB for audit events
145            storage: jetstream::stream::StorageType::File,
146            num_replicas: 1,
147            discard: jetstream::stream::DiscardPolicy::Old,
148            duplicate_window: Duration::from_secs(60), // 1 minute deduplication
149            ..Default::default()
150        };
151
152        self.create_or_update_stream(audit_config).await?;
153        info!("AUDIT_SECURITY stream ensured with compliance retention");
154        Ok(())
155    }
156
157    /// Phase 2: Backpressure and quarantine streams
158    pub async fn ensure_backpressure_streams(&self) -> Result<()> {
159        // Backpressure handling stream
160        let backpressure_config = StreamConfig {
161            name: "SDLC_QUARANTINE_BACKPRESSURE".to_string(),
162            description: Some("Phase 2: Backpressure and quarantine handling".to_string()),
163            subjects: vec!["smith.intents.quarantine.*".to_string()],
164            retention: jetstream::stream::RetentionPolicy::WorkQueue, // Process and remove
165            max_age: Duration::from_secs(2 * 60 * 60), // 2 hours for backpressure handling
166            max_bytes: 100 * 1024 * 1024,              // 100MB for quarantined items
167            max_messages: 10_000,                      // Reasonable limit for quarantine
168            max_message_size: 1024 * 1024,             // 1MB max
169            storage: jetstream::stream::StorageType::File,
170            num_replicas: 1,
171            discard: jetstream::stream::DiscardPolicy::Old,
172            duplicate_window: Duration::from_secs(30), // 30 seconds deduplication
173            ..Default::default()
174        };
175
176        self.create_or_update_stream(backpressure_config).await?;
177        info!("SDLC_QUARANTINE_BACKPRESSURE stream ensured");
178        Ok(())
179    }
180
181    /// Ensure the INTENT_RESULTS stream exists with proper configuration
182    pub async fn ensure_results_stream(&self) -> Result<()> {
183        let stream_name = "INTENT_RESULTS";
184        let subjects = vec!["smith.results.*".to_string()];
185
186        let config = StreamConfig {
187            name: stream_name.to_string(),
188            description: Some("Results from intent execution".to_string()),
189            subjects,
190            retention: jetstream::stream::RetentionPolicy::Limits, // Keep for specified time/count
191            max_age: Duration::from_secs(48 * 60 * 60),            // Keep results for 48 hours
192            max_bytes: 500 * 1024 * 1024,                          // 500MB max stream size
193            max_messages: 50_000,                                  // 50k messages max
194            max_message_size: 1024 * 1024,                         // 1MB max message size
195            storage: jetstream::stream::StorageType::File,
196            num_replicas: 1,
197            discard: jetstream::stream::DiscardPolicy::Old,
198            duplicate_window: Duration::from_secs(5 * 60),
199            ..Default::default()
200        };
201
202        self.create_or_update_stream(config).await?;
203        info!("INTENT_RESULTS stream ensured");
204        Ok(())
205    }
206
207    /// Ensure the AUDIT_LOGS stream exists with proper configuration
208    pub async fn ensure_audit_stream(&self) -> Result<()> {
209        let stream_name = "AUDIT_LOGS";
210        let subjects = vec!["smith.audit.*".to_string()];
211
212        let config = StreamConfig {
213            name: stream_name.to_string(),
214            description: Some("Audit logs for compliance and debugging".to_string()),
215            subjects,
216            retention: jetstream::stream::RetentionPolicy::Limits,
217            max_age: Duration::from_secs(30 * 24 * 60 * 60), // Keep audit logs for 30 days
218            max_bytes: 1024 * 1024 * 1024,                   // 1GB max stream size
219            max_messages: 100_000,                           // 100k messages max
220            max_message_size: 512 * 1024,                    // 512KB max message size
221            storage: jetstream::stream::StorageType::File,
222            num_replicas: 1,
223            discard: jetstream::stream::DiscardPolicy::Old,
224            duplicate_window: Duration::from_secs(60),
225            ..Default::default()
226        };
227
228        self.create_or_update_stream(config).await?;
229        info!("AUDIT_LOGS stream ensured");
230        Ok(())
231    }
232
233    /// Ensure the SYSTEM_EVENTS stream exists with proper configuration
234    pub async fn ensure_system_events_stream(&self) -> Result<()> {
235        let stream_name = "SYSTEM_EVENTS";
236        let subjects = vec!["smith.system.*".to_string()];
237
238        let config = StreamConfig {
239            name: stream_name.to_string(),
240            description: Some("System-level events and health monitoring".to_string()),
241            subjects,
242            retention: jetstream::stream::RetentionPolicy::Limits,
243            max_age: Duration::from_secs(12 * 60 * 60), // Keep system events for 12 hours
244            max_bytes: 50 * 1024 * 1024,                // 50MB max stream size
245            max_messages: 10_000,                       // 10k messages max
246            max_message_size: 64 * 1024,                // 64KB max message size
247            storage: jetstream::stream::StorageType::File,
248            num_replicas: 1,
249            discard: jetstream::stream::DiscardPolicy::Old,
250            duplicate_window: Duration::from_secs(30),
251            ..Default::default()
252        };
253
254        self.create_or_update_stream(config).await?;
255        info!("SYSTEM_EVENTS stream ensured");
256        Ok(())
257    }
258
259    /// Create or update a stream with the given configuration
260    async fn create_or_update_stream(&self, config: StreamConfig) -> Result<()> {
261        let stream_name = config.name.clone();
262
263        debug!("Checking if stream {} exists", stream_name);
264
265        match self.jetstream.get_stream(&stream_name).await {
266            Ok(mut existing_stream) => {
267                // Stream exists, check if update is needed
268                let existing_config = existing_stream.info().await?.config.clone();
269
270                if self.configs_differ(&existing_config, &config) {
271                    info!("Updating stream {} configuration", stream_name);
272                    self.jetstream
273                        .update_stream(&config)
274                        .await
275                        .with_context(|| format!("Failed to update stream: {}", stream_name))?;
276                    info!("Stream {} updated successfully", stream_name);
277                } else {
278                    debug!(
279                        "Stream {} already exists with correct configuration",
280                        stream_name
281                    );
282                }
283            }
284            Err(_) => {
285                // Stream doesn't exist, create it
286                info!("Creating stream: {}", stream_name);
287                match self.jetstream.create_stream(&config).await {
288                    Ok(_) => {
289                        info!("Stream {} created successfully", stream_name);
290                    }
291                    Err(err) => {
292                        warn!(
293                            "Stream {} creation returned error ({}); assuming it already exists",
294                            stream_name, err
295                        );
296                    }
297                }
298            }
299        }
300
301        Ok(())
302    }
303
304    /// Check if stream configurations differ significantly
305    fn configs_differ(&self, existing: &StreamConfig, new: &StreamConfig) -> bool {
306        // Compare key configuration fields
307        existing.subjects != new.subjects
308            || existing.retention != new.retention
309            || existing.max_age != new.max_age
310            || existing.max_bytes != new.max_bytes
311            || existing.max_messages != new.max_messages
312            || existing.storage != new.storage
313    }
314
315    /// Get information about all Smith streams
316    pub async fn get_streams_info(&self) -> Result<Vec<StreamInfo>> {
317        let stream_names = vec!["INTENTS", "INTENT_RESULTS", "AUDIT_LOGS", "SYSTEM_EVENTS"];
318        let mut streams_info = Vec::new();
319
320        for stream_name in stream_names {
321            match self.jetstream.get_stream(stream_name).await {
322                Ok(mut stream) => {
323                    let info = stream.info().await?;
324                    streams_info.push(StreamInfo {
325                        name: stream_name.to_string(),
326                        subjects: info.config.subjects.clone(),
327                        messages: info.state.messages,
328                        bytes: info.state.bytes,
329                        first_seq: info.state.first_sequence,
330                        last_seq: info.state.last_sequence,
331                        consumer_count: info.state.consumer_count,
332                        exists: true,
333                    });
334                }
335                Err(_) => {
336                    streams_info.push(StreamInfo {
337                        name: stream_name.to_string(),
338                        subjects: vec![],
339                        messages: 0,
340                        bytes: 0,
341                        first_seq: 0,
342                        last_seq: 0,
343                        consumer_count: 0,
344                        exists: false,
345                    });
346                }
347            }
348        }
349
350        Ok(streams_info)
351    }
352
353    /// Delete a stream (use with caution)
354    pub async fn delete_stream(&self, stream_name: &str) -> Result<()> {
355        warn!("Deleting stream: {}", stream_name);
356
357        self.jetstream
358            .delete_stream(stream_name)
359            .await
360            .with_context(|| format!("Failed to delete stream: {}", stream_name))?;
361
362        info!("Stream {} deleted successfully", stream_name);
363        Ok(())
364    }
365}
366
367/// Information about a JetStream stream
368#[derive(Debug, Clone)]
369pub struct StreamInfo {
370    /// Stream name
371    pub name: String,
372    /// Subjects this stream listens to
373    pub subjects: Vec<String>,
374    /// Number of messages in the stream
375    pub messages: u64,
376    /// Total bytes in the stream
377    pub bytes: u64,
378    /// First sequence number
379    pub first_seq: u64,
380    /// Last sequence number
381    pub last_seq: u64,
382    /// Number of consumers attached to this stream
383    pub consumer_count: usize,
384    /// Whether the stream exists
385    pub exists: bool,
386}
387
388impl StreamInfo {
389    /// Check if the stream is healthy (exists and not at capacity)
390    pub fn is_healthy(&self) -> bool {
391        self.exists && self.messages < 8000 && self.bytes < 80 * 1024 * 1024
392    }
393
394    /// Get utilization percentage (0-100)
395    pub fn utilization_percent(&self) -> f64 {
396        if !self.exists {
397            return 0.0;
398        }
399
400        // Estimate based on typical stream limits
401        let msg_util = (self.messages as f64 / 10000.0) * 100.0;
402        let byte_util = (self.bytes as f64 / (100.0 * 1024.0 * 1024.0)) * 100.0;
403
404        msg_util.max(byte_util).min(100.0)
405    }
406}
407
408#[cfg(test)]
409mod tests {
410    use super::*;
411
412    #[test]
413    fn test_stream_info_health() {
414        let healthy_stream = StreamInfo {
415            name: "TEST".to_string(),
416            subjects: vec!["test.*".to_string()],
417            messages: 1000,
418            bytes: 10 * 1024 * 1024, // 10MB
419            first_seq: 1,
420            last_seq: 1000,
421            consumer_count: 2,
422            exists: true,
423        };
424
425        assert!(healthy_stream.is_healthy());
426
427        let unhealthy_stream = StreamInfo {
428            name: "TEST".to_string(),
429            subjects: vec!["test.*".to_string()],
430            messages: 9000,          // Too many messages
431            bytes: 90 * 1024 * 1024, // 90MB - too much data
432            first_seq: 1,
433            last_seq: 9000,
434            consumer_count: 1,
435            exists: true,
436        };
437
438        assert!(!unhealthy_stream.is_healthy());
439    }
440
441    #[test]
442    fn test_stream_utilization() {
443        let stream = StreamInfo {
444            name: "TEST".to_string(),
445            subjects: vec!["test.*".to_string()],
446            messages: 5000,          // 50% of 10k limit
447            bytes: 50 * 1024 * 1024, // 50% of 100MB limit
448            first_seq: 1,
449            last_seq: 5000,
450            consumer_count: 1,
451            exists: true,
452        };
453
454        let utilization = stream.utilization_percent();
455        assert!((45.0..=55.0).contains(&utilization)); // Should be around 50%
456    }
457
458    #[test]
459    fn test_non_existent_stream() {
460        let stream = StreamInfo {
461            name: "MISSING".to_string(),
462            subjects: vec![],
463            messages: 0,
464            bytes: 0,
465            first_seq: 0,
466            last_seq: 0,
467            consumer_count: 0,
468            exists: false,
469        };
470
471        assert!(!stream.is_healthy());
472        assert_eq!(stream.utilization_percent(), 0.0);
473    }
474}