1use anyhow::{Context, Result};
2use async_nats::jetstream::{self, stream::Config as StreamConfig};
3use std::time::Duration;
4use tracing::{debug, info, warn};
5
6pub struct StreamManager {
8 jetstream: jetstream::Context,
9}
10
11impl StreamManager {
12 pub fn new(jetstream: jetstream::Context) -> Self {
13 Self { jetstream }
14 }
15
16 pub async fn bootstrap_streams(&self) -> Result<()> {
18 info!("Bootstrapping Smith JetStream streams with Phase 2 performance optimization");
19
20 self.ensure_sdlc_raw_stream()
22 .await
23 .context("Failed to ensure SDLC_RAW stream")?;
24
25 self.ensure_atoms_vetted_stream()
26 .await
27 .context("Failed to ensure ATOMS_VETTED stream")?;
28
29 self.ensure_atoms_results_stream()
30 .await
31 .context("Failed to ensure ATOMS_RESULTS stream")?;
32
33 self.ensure_audit_streams()
34 .await
35 .context("Failed to ensure AUDIT streams")?;
36
37 self.ensure_backpressure_streams()
38 .await
39 .context("Failed to ensure BACKPRESSURE streams")?;
40
41 info!("All Smith Phase 2 streams bootstrapped successfully");
42 Ok(())
43 }
44
45 pub async fn ensure_sdlc_raw_stream(&self) -> Result<()> {
47 let stream_name = "SDLC_RAW";
48 let subjects = vec![
49 "smith.intents.raw.*".to_string(), ];
51
52 let config = StreamConfig {
53 name: stream_name.to_string(),
54 description: Some(
55 "Phase 2: Raw intent ingestion with high-throughput optimization".to_string(),
56 ),
57 subjects,
58 retention: jetstream::stream::RetentionPolicy::WorkQueue, max_age: Duration::from_secs(6 * 60 * 60), max_bytes: 500 * 1024 * 1024, max_messages: 50_000, max_message_size: 2 * 1024 * 1024, storage: jetstream::stream::StorageType::File,
64 num_replicas: 1,
65 discard: jetstream::stream::DiscardPolicy::Old,
66 duplicate_window: Duration::from_secs(60), ..Default::default()
68 };
69
70 self.create_or_update_stream(config).await?;
71 info!("SDLC_RAW stream ensured with high-throughput configuration");
72 Ok(())
73 }
74
75 pub async fn ensure_atoms_vetted_stream(&self) -> Result<()> {
77 let stream_name = "ATOMS_VETTED";
78 let subjects = vec![
79 "smith.intents.vetted.*".to_string(), ];
81
82 let config = StreamConfig {
83 name: stream_name.to_string(),
84 description: Some(
85 "Phase 2: Policy-approved intents with ordering guarantees".to_string(),
86 ),
87 subjects,
88 retention: jetstream::stream::RetentionPolicy::Interest, max_age: Duration::from_secs(12 * 60 * 60), max_bytes: 1024 * 1024 * 1024, max_messages: 100_000, max_message_size: 2 * 1024 * 1024, storage: jetstream::stream::StorageType::File,
94 num_replicas: 1,
95 discard: jetstream::stream::DiscardPolicy::Old,
96 duplicate_window: Duration::from_secs(2 * 60), ..Default::default()
98 };
99
100 self.create_or_update_stream(config).await?;
101 info!("ATOMS_VETTED stream ensured with ordering guarantees");
102 Ok(())
103 }
104
105 pub async fn ensure_atoms_results_stream(&self) -> Result<()> {
107 let stream_name = "ATOMS_RESULTS";
108 let subjects = vec![
109 "smith.results.*".to_string(), ];
111
112 let config = StreamConfig {
113 name: stream_name.to_string(),
114 description: Some("Phase 2: Execution results with performance tracking".to_string()),
115 subjects,
116 retention: jetstream::stream::RetentionPolicy::Limits, max_age: Duration::from_secs(48 * 60 * 60), max_bytes: 2048 * 1024 * 1024, max_messages: 200_000, max_message_size: 4 * 1024 * 1024, storage: jetstream::stream::StorageType::File,
122 num_replicas: 1,
123 discard: jetstream::stream::DiscardPolicy::Old,
124 duplicate_window: Duration::from_secs(5 * 60), ..Default::default()
126 };
127
128 self.create_or_update_stream(config).await?;
129 info!("ATOMS_RESULTS stream ensured with performance tracking");
130 Ok(())
131 }
132
133 pub async fn ensure_audit_streams(&self) -> Result<()> {
135 let audit_config = StreamConfig {
137 name: "AUDIT_SECURITY".to_string(),
138 description: Some("Phase 2: Security and compliance audit events".to_string()),
139 subjects: vec!["smith.audit.*".to_string()],
140 retention: jetstream::stream::RetentionPolicy::Interest, max_age: Duration::from_secs(365 * 24 * 60 * 60), max_bytes: 10 * 1024 * 1024 * 1024, max_messages: 1_000_000, max_message_size: 1024 * 1024, storage: jetstream::stream::StorageType::File,
146 num_replicas: 1,
147 discard: jetstream::stream::DiscardPolicy::Old,
148 duplicate_window: Duration::from_secs(60), ..Default::default()
150 };
151
152 self.create_or_update_stream(audit_config).await?;
153 info!("AUDIT_SECURITY stream ensured with compliance retention");
154 Ok(())
155 }
156
157 pub async fn ensure_backpressure_streams(&self) -> Result<()> {
159 let backpressure_config = StreamConfig {
161 name: "SDLC_QUARANTINE_BACKPRESSURE".to_string(),
162 description: Some("Phase 2: Backpressure and quarantine handling".to_string()),
163 subjects: vec!["smith.intents.quarantine.*".to_string()],
164 retention: jetstream::stream::RetentionPolicy::WorkQueue, max_age: Duration::from_secs(2 * 60 * 60), max_bytes: 100 * 1024 * 1024, max_messages: 10_000, max_message_size: 1024 * 1024, storage: jetstream::stream::StorageType::File,
170 num_replicas: 1,
171 discard: jetstream::stream::DiscardPolicy::Old,
172 duplicate_window: Duration::from_secs(30), ..Default::default()
174 };
175
176 self.create_or_update_stream(backpressure_config).await?;
177 info!("SDLC_QUARANTINE_BACKPRESSURE stream ensured");
178 Ok(())
179 }
180
181 pub async fn ensure_results_stream(&self) -> Result<()> {
183 let stream_name = "INTENT_RESULTS";
184 let subjects = vec!["smith.results.*".to_string()];
185
186 let config = StreamConfig {
187 name: stream_name.to_string(),
188 description: Some("Results from intent execution".to_string()),
189 subjects,
190 retention: jetstream::stream::RetentionPolicy::Limits, max_age: Duration::from_secs(48 * 60 * 60), max_bytes: 500 * 1024 * 1024, max_messages: 50_000, max_message_size: 1024 * 1024, storage: jetstream::stream::StorageType::File,
196 num_replicas: 1,
197 discard: jetstream::stream::DiscardPolicy::Old,
198 duplicate_window: Duration::from_secs(5 * 60),
199 ..Default::default()
200 };
201
202 self.create_or_update_stream(config).await?;
203 info!("INTENT_RESULTS stream ensured");
204 Ok(())
205 }
206
207 pub async fn ensure_audit_stream(&self) -> Result<()> {
209 let stream_name = "AUDIT_LOGS";
210 let subjects = vec!["smith.audit.*".to_string()];
211
212 let config = StreamConfig {
213 name: stream_name.to_string(),
214 description: Some("Audit logs for compliance and debugging".to_string()),
215 subjects,
216 retention: jetstream::stream::RetentionPolicy::Limits,
217 max_age: Duration::from_secs(30 * 24 * 60 * 60), max_bytes: 1024 * 1024 * 1024, max_messages: 100_000, max_message_size: 512 * 1024, storage: jetstream::stream::StorageType::File,
222 num_replicas: 1,
223 discard: jetstream::stream::DiscardPolicy::Old,
224 duplicate_window: Duration::from_secs(60),
225 ..Default::default()
226 };
227
228 self.create_or_update_stream(config).await?;
229 info!("AUDIT_LOGS stream ensured");
230 Ok(())
231 }
232
233 pub async fn ensure_system_events_stream(&self) -> Result<()> {
235 let stream_name = "SYSTEM_EVENTS";
236 let subjects = vec!["smith.system.*".to_string()];
237
238 let config = StreamConfig {
239 name: stream_name.to_string(),
240 description: Some("System-level events and health monitoring".to_string()),
241 subjects,
242 retention: jetstream::stream::RetentionPolicy::Limits,
243 max_age: Duration::from_secs(12 * 60 * 60), max_bytes: 50 * 1024 * 1024, max_messages: 10_000, max_message_size: 64 * 1024, storage: jetstream::stream::StorageType::File,
248 num_replicas: 1,
249 discard: jetstream::stream::DiscardPolicy::Old,
250 duplicate_window: Duration::from_secs(30),
251 ..Default::default()
252 };
253
254 self.create_or_update_stream(config).await?;
255 info!("SYSTEM_EVENTS stream ensured");
256 Ok(())
257 }
258
259 async fn create_or_update_stream(&self, config: StreamConfig) -> Result<()> {
261 let stream_name = config.name.clone();
262
263 debug!("Checking if stream {} exists", stream_name);
264
265 match self.jetstream.get_stream(&stream_name).await {
266 Ok(mut existing_stream) => {
267 let existing_config = existing_stream.info().await?.config.clone();
269
270 if self.configs_differ(&existing_config, &config) {
271 info!("Updating stream {} configuration", stream_name);
272 self.jetstream
273 .update_stream(&config)
274 .await
275 .with_context(|| format!("Failed to update stream: {}", stream_name))?;
276 info!("Stream {} updated successfully", stream_name);
277 } else {
278 debug!(
279 "Stream {} already exists with correct configuration",
280 stream_name
281 );
282 }
283 }
284 Err(_) => {
285 info!("Creating stream: {}", stream_name);
287 match self.jetstream.create_stream(&config).await {
288 Ok(_) => {
289 info!("Stream {} created successfully", stream_name);
290 }
291 Err(err) => {
292 warn!(
293 "Stream {} creation returned error ({}); assuming it already exists",
294 stream_name, err
295 );
296 }
297 }
298 }
299 }
300
301 Ok(())
302 }
303
304 fn configs_differ(&self, existing: &StreamConfig, new: &StreamConfig) -> bool {
306 existing.subjects != new.subjects
308 || existing.retention != new.retention
309 || existing.max_age != new.max_age
310 || existing.max_bytes != new.max_bytes
311 || existing.max_messages != new.max_messages
312 || existing.storage != new.storage
313 }
314
315 pub async fn get_streams_info(&self) -> Result<Vec<StreamInfo>> {
317 let stream_names = vec!["INTENTS", "INTENT_RESULTS", "AUDIT_LOGS", "SYSTEM_EVENTS"];
318 let mut streams_info = Vec::new();
319
320 for stream_name in stream_names {
321 match self.jetstream.get_stream(stream_name).await {
322 Ok(mut stream) => {
323 let info = stream.info().await?;
324 streams_info.push(StreamInfo {
325 name: stream_name.to_string(),
326 subjects: info.config.subjects.clone(),
327 messages: info.state.messages,
328 bytes: info.state.bytes,
329 first_seq: info.state.first_sequence,
330 last_seq: info.state.last_sequence,
331 consumer_count: info.state.consumer_count,
332 exists: true,
333 });
334 }
335 Err(_) => {
336 streams_info.push(StreamInfo {
337 name: stream_name.to_string(),
338 subjects: vec![],
339 messages: 0,
340 bytes: 0,
341 first_seq: 0,
342 last_seq: 0,
343 consumer_count: 0,
344 exists: false,
345 });
346 }
347 }
348 }
349
350 Ok(streams_info)
351 }
352
353 pub async fn delete_stream(&self, stream_name: &str) -> Result<()> {
355 warn!("Deleting stream: {}", stream_name);
356
357 self.jetstream
358 .delete_stream(stream_name)
359 .await
360 .with_context(|| format!("Failed to delete stream: {}", stream_name))?;
361
362 info!("Stream {} deleted successfully", stream_name);
363 Ok(())
364 }
365}
366
367#[derive(Debug, Clone)]
369pub struct StreamInfo {
370 pub name: String,
372 pub subjects: Vec<String>,
374 pub messages: u64,
376 pub bytes: u64,
378 pub first_seq: u64,
380 pub last_seq: u64,
382 pub consumer_count: usize,
384 pub exists: bool,
386}
387
388impl StreamInfo {
389 pub fn is_healthy(&self) -> bool {
391 self.exists && self.messages < 8000 && self.bytes < 80 * 1024 * 1024
392 }
393
394 pub fn utilization_percent(&self) -> f64 {
396 if !self.exists {
397 return 0.0;
398 }
399
400 let msg_util = (self.messages as f64 / 10000.0) * 100.0;
402 let byte_util = (self.bytes as f64 / (100.0 * 1024.0 * 1024.0)) * 100.0;
403
404 msg_util.max(byte_util).min(100.0)
405 }
406}
407
408#[cfg(test)]
409mod tests {
410 use super::*;
411
412 #[test]
413 fn test_stream_info_health() {
414 let healthy_stream = StreamInfo {
415 name: "TEST".to_string(),
416 subjects: vec!["test.*".to_string()],
417 messages: 1000,
418 bytes: 10 * 1024 * 1024, first_seq: 1,
420 last_seq: 1000,
421 consumer_count: 2,
422 exists: true,
423 };
424
425 assert!(healthy_stream.is_healthy());
426
427 let unhealthy_stream = StreamInfo {
428 name: "TEST".to_string(),
429 subjects: vec!["test.*".to_string()],
430 messages: 9000, bytes: 90 * 1024 * 1024, first_seq: 1,
433 last_seq: 9000,
434 consumer_count: 1,
435 exists: true,
436 };
437
438 assert!(!unhealthy_stream.is_healthy());
439 }
440
441 #[test]
442 fn test_stream_utilization() {
443 let stream = StreamInfo {
444 name: "TEST".to_string(),
445 subjects: vec!["test.*".to_string()],
446 messages: 5000, bytes: 50 * 1024 * 1024, first_seq: 1,
449 last_seq: 5000,
450 consumer_count: 1,
451 exists: true,
452 };
453
454 let utilization = stream.utilization_percent();
455 assert!((45.0..=55.0).contains(&utilization)); }
457
458 #[test]
459 fn test_non_existent_stream() {
460 let stream = StreamInfo {
461 name: "MISSING".to_string(),
462 subjects: vec![],
463 messages: 0,
464 bytes: 0,
465 first_seq: 0,
466 last_seq: 0,
467 consumer_count: 0,
468 exists: false,
469 };
470
471 assert!(!stream.is_healthy());
472 assert_eq!(stream.utilization_percent(), 0.0);
473 }
474}