use anyhow::{Context, Result};
use async_nats::jetstream::{self, stream::Config as StreamConfig};
use std::time::Duration;
use tracing::{debug, info, warn};
pub struct StreamManager {
jetstream: jetstream::Context,
}
impl StreamManager {
pub fn new(jetstream: jetstream::Context) -> Self {
Self { jetstream }
}
pub async fn bootstrap_streams(&self) -> Result<()> {
info!("Bootstrapping Smith JetStream streams with Phase 2 performance optimization");
self.ensure_sdlc_raw_stream()
.await
.context("Failed to ensure SDLC_RAW stream")?;
self.ensure_atoms_vetted_stream()
.await
.context("Failed to ensure ATOMS_VETTED stream")?;
self.ensure_atoms_results_stream()
.await
.context("Failed to ensure ATOMS_RESULTS stream")?;
self.ensure_audit_streams()
.await
.context("Failed to ensure AUDIT streams")?;
self.ensure_backpressure_streams()
.await
.context("Failed to ensure BACKPRESSURE streams")?;
info!("All Smith Phase 2 streams bootstrapped successfully");
Ok(())
}
pub async fn ensure_sdlc_raw_stream(&self) -> Result<()> {
let stream_name = "SDLC_RAW";
let subjects = vec![
"smith.intents.raw.*".to_string(), ];
let config = StreamConfig {
name: stream_name.to_string(),
description: Some(
"Phase 2: Raw intent ingestion with high-throughput optimization".to_string(),
),
subjects,
retention: jetstream::stream::RetentionPolicy::WorkQueue, max_age: Duration::from_secs(6 * 60 * 60), max_bytes: 500 * 1024 * 1024, max_messages: 50_000, max_message_size: 2 * 1024 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(60), ..Default::default()
};
self.create_or_update_stream(config).await?;
info!("SDLC_RAW stream ensured with high-throughput configuration");
Ok(())
}
pub async fn ensure_atoms_vetted_stream(&self) -> Result<()> {
let stream_name = "ATOMS_VETTED";
let subjects = vec![
"smith.intents.vetted.*".to_string(), ];
let config = StreamConfig {
name: stream_name.to_string(),
description: Some(
"Phase 2: Policy-approved intents with ordering guarantees".to_string(),
),
subjects,
retention: jetstream::stream::RetentionPolicy::Interest, max_age: Duration::from_secs(12 * 60 * 60), max_bytes: 1024 * 1024 * 1024, max_messages: 100_000, max_message_size: 2 * 1024 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(2 * 60), ..Default::default()
};
self.create_or_update_stream(config).await?;
info!("ATOMS_VETTED stream ensured with ordering guarantees");
Ok(())
}
pub async fn ensure_atoms_results_stream(&self) -> Result<()> {
let stream_name = "ATOMS_RESULTS";
let subjects = vec![
"smith.results.*".to_string(), ];
let config = StreamConfig {
name: stream_name.to_string(),
description: Some("Phase 2: Execution results with performance tracking".to_string()),
subjects,
retention: jetstream::stream::RetentionPolicy::Limits, max_age: Duration::from_secs(48 * 60 * 60), max_bytes: 2048 * 1024 * 1024, max_messages: 200_000, max_message_size: 4 * 1024 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(5 * 60), ..Default::default()
};
self.create_or_update_stream(config).await?;
info!("ATOMS_RESULTS stream ensured with performance tracking");
Ok(())
}
pub async fn ensure_audit_streams(&self) -> Result<()> {
let audit_config = StreamConfig {
name: "AUDIT_SECURITY".to_string(),
description: Some("Phase 2: Security and compliance audit events".to_string()),
subjects: vec!["smith.audit.*".to_string()],
retention: jetstream::stream::RetentionPolicy::Interest, max_age: Duration::from_secs(365 * 24 * 60 * 60), max_bytes: 10 * 1024 * 1024 * 1024, max_messages: 1_000_000, max_message_size: 1024 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(60), ..Default::default()
};
self.create_or_update_stream(audit_config).await?;
info!("AUDIT_SECURITY stream ensured with compliance retention");
Ok(())
}
pub async fn ensure_backpressure_streams(&self) -> Result<()> {
let backpressure_config = StreamConfig {
name: "SDLC_QUARANTINE_BACKPRESSURE".to_string(),
description: Some("Phase 2: Backpressure and quarantine handling".to_string()),
subjects: vec!["smith.intents.quarantine.*".to_string()],
retention: jetstream::stream::RetentionPolicy::WorkQueue, max_age: Duration::from_secs(2 * 60 * 60), max_bytes: 100 * 1024 * 1024, max_messages: 10_000, max_message_size: 1024 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(30), ..Default::default()
};
self.create_or_update_stream(backpressure_config).await?;
info!("SDLC_QUARANTINE_BACKPRESSURE stream ensured");
Ok(())
}
pub async fn ensure_results_stream(&self) -> Result<()> {
let stream_name = "INTENT_RESULTS";
let subjects = vec!["smith.results.*".to_string()];
let config = StreamConfig {
name: stream_name.to_string(),
description: Some("Results from intent execution".to_string()),
subjects,
retention: jetstream::stream::RetentionPolicy::Limits, max_age: Duration::from_secs(48 * 60 * 60), max_bytes: 500 * 1024 * 1024, max_messages: 50_000, max_message_size: 1024 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(5 * 60),
..Default::default()
};
self.create_or_update_stream(config).await?;
info!("INTENT_RESULTS stream ensured");
Ok(())
}
pub async fn ensure_audit_stream(&self) -> Result<()> {
let stream_name = "AUDIT_LOGS";
let subjects = vec!["smith.audit.*".to_string()];
let config = StreamConfig {
name: stream_name.to_string(),
description: Some("Audit logs for compliance and debugging".to_string()),
subjects,
retention: jetstream::stream::RetentionPolicy::Limits,
max_age: Duration::from_secs(30 * 24 * 60 * 60), max_bytes: 1024 * 1024 * 1024, max_messages: 100_000, max_message_size: 512 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(60),
..Default::default()
};
self.create_or_update_stream(config).await?;
info!("AUDIT_LOGS stream ensured");
Ok(())
}
pub async fn ensure_system_events_stream(&self) -> Result<()> {
let stream_name = "SYSTEM_EVENTS";
let subjects = vec!["smith.system.*".to_string()];
let config = StreamConfig {
name: stream_name.to_string(),
description: Some("System-level events and health monitoring".to_string()),
subjects,
retention: jetstream::stream::RetentionPolicy::Limits,
max_age: Duration::from_secs(12 * 60 * 60), max_bytes: 50 * 1024 * 1024, max_messages: 10_000, max_message_size: 64 * 1024, storage: jetstream::stream::StorageType::File,
num_replicas: 1,
discard: jetstream::stream::DiscardPolicy::Old,
duplicate_window: Duration::from_secs(30),
..Default::default()
};
self.create_or_update_stream(config).await?;
info!("SYSTEM_EVENTS stream ensured");
Ok(())
}
async fn create_or_update_stream(&self, config: StreamConfig) -> Result<()> {
let stream_name = config.name.clone();
debug!("Checking if stream {} exists", stream_name);
match self.jetstream.get_stream(&stream_name).await {
Ok(mut existing_stream) => {
let existing_config = existing_stream.info().await?.config.clone();
if self.configs_differ(&existing_config, &config) {
info!("Updating stream {} configuration", stream_name);
self.jetstream
.update_stream(&config)
.await
.with_context(|| format!("Failed to update stream: {}", stream_name))?;
info!("Stream {} updated successfully", stream_name);
} else {
debug!(
"Stream {} already exists with correct configuration",
stream_name
);
}
}
Err(_) => {
info!("Creating stream: {}", stream_name);
match self.jetstream.create_stream(&config).await {
Ok(_) => {
info!("Stream {} created successfully", stream_name);
}
Err(err) => {
warn!(
"Stream {} creation returned error ({}); assuming it already exists",
stream_name, err
);
}
}
}
}
Ok(())
}
fn configs_differ(&self, existing: &StreamConfig, new: &StreamConfig) -> bool {
existing.subjects != new.subjects
|| existing.retention != new.retention
|| existing.max_age != new.max_age
|| existing.max_bytes != new.max_bytes
|| existing.max_messages != new.max_messages
|| existing.storage != new.storage
}
pub async fn get_streams_info(&self) -> Result<Vec<StreamInfo>> {
let stream_names = vec!["INTENTS", "INTENT_RESULTS", "AUDIT_LOGS", "SYSTEM_EVENTS"];
let mut streams_info = Vec::new();
for stream_name in stream_names {
match self.jetstream.get_stream(stream_name).await {
Ok(mut stream) => {
let info = stream.info().await?;
streams_info.push(StreamInfo {
name: stream_name.to_string(),
subjects: info.config.subjects.clone(),
messages: info.state.messages,
bytes: info.state.bytes,
first_seq: info.state.first_sequence,
last_seq: info.state.last_sequence,
consumer_count: info.state.consumer_count,
exists: true,
});
}
Err(_) => {
streams_info.push(StreamInfo {
name: stream_name.to_string(),
subjects: vec![],
messages: 0,
bytes: 0,
first_seq: 0,
last_seq: 0,
consumer_count: 0,
exists: false,
});
}
}
}
Ok(streams_info)
}
pub async fn delete_stream(&self, stream_name: &str) -> Result<()> {
warn!("Deleting stream: {}", stream_name);
self.jetstream
.delete_stream(stream_name)
.await
.with_context(|| format!("Failed to delete stream: {}", stream_name))?;
info!("Stream {} deleted successfully", stream_name);
Ok(())
}
}
#[derive(Debug, Clone)]
pub struct StreamInfo {
pub name: String,
pub subjects: Vec<String>,
pub messages: u64,
pub bytes: u64,
pub first_seq: u64,
pub last_seq: u64,
pub consumer_count: usize,
pub exists: bool,
}
impl StreamInfo {
pub fn is_healthy(&self) -> bool {
self.exists && self.messages < 8000 && self.bytes < 80 * 1024 * 1024
}
pub fn utilization_percent(&self) -> f64 {
if !self.exists {
return 0.0;
}
let msg_util = (self.messages as f64 / 10000.0) * 100.0;
let byte_util = (self.bytes as f64 / (100.0 * 1024.0 * 1024.0)) * 100.0;
msg_util.max(byte_util).min(100.0)
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_stream_info_health() {
let healthy_stream = StreamInfo {
name: "TEST".to_string(),
subjects: vec!["test.*".to_string()],
messages: 1000,
bytes: 10 * 1024 * 1024, first_seq: 1,
last_seq: 1000,
consumer_count: 2,
exists: true,
};
assert!(healthy_stream.is_healthy());
let unhealthy_stream = StreamInfo {
name: "TEST".to_string(),
subjects: vec!["test.*".to_string()],
messages: 9000, bytes: 90 * 1024 * 1024, first_seq: 1,
last_seq: 9000,
consumer_count: 1,
exists: true,
};
assert!(!unhealthy_stream.is_healthy());
}
#[test]
fn test_stream_utilization() {
let stream = StreamInfo {
name: "TEST".to_string(),
subjects: vec!["test.*".to_string()],
messages: 5000, bytes: 50 * 1024 * 1024, first_seq: 1,
last_seq: 5000,
consumer_count: 1,
exists: true,
};
let utilization = stream.utilization_percent();
assert!((45.0..=55.0).contains(&utilization)); }
#[test]
fn test_non_existent_stream() {
let stream = StreamInfo {
name: "MISSING".to_string(),
subjects: vec![],
messages: 0,
bytes: 0,
first_seq: 0,
last_seq: 0,
consumer_count: 0,
exists: false,
};
assert!(!stream.is_healthy());
assert_eq!(stream.utilization_percent(), 0.0);
}
}