use crate::notification_backend::jetstream::backend::JetStreamBackend;
use crate::notification_backend::{DeleteMessageResult, NotificationBackend};
use crate::telemetry::{SERVICE_NAME, SERVICE_VERSION};
use anyhow::{Context, Result};
use futures::StreamExt;
use tracing::{info, warn};
pub async fn wipe_stream(backend: &JetStreamBackend, stream_name: &str) -> Result<()> {
let mut stream = backend
.jetstream
.get_stream(stream_name)
.await
.context(format!("Failed to get stream {}", stream_name))?;
let info = stream.info().await.context("Failed to get stream info")?;
let total_messages = info.state.messages;
stream.purge().await.context("Failed to purge stream")?;
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.stream.wipe.succeeded",
stream_name = %stream_name,
messages_purged = total_messages,
"Wiped entire stream - all messages removed but stream configuration preserved"
);
Ok(())
}
pub async fn wipe_all(backend: &JetStreamBackend) -> Result<()> {
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.all.wipe.started",
"Starting complete wipe of all JetStream data"
);
let mut streams = backend.jetstream.streams();
let mut total_streams_purged = 0;
let mut total_messages_purged = 0;
while let Some(stream_info) = streams.next().await {
match stream_info {
Ok(info) => {
let stream_name = &info.config.name;
let message_count = info.state.messages;
match backend.wipe_stream(stream_name).await {
Ok(_) => {
total_streams_purged += 1;
total_messages_purged += message_count;
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.stream.wipe.succeeded",
stream = %stream_name,
messages = message_count,
"Successfully purged stream"
);
}
Err(e) => {
warn!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.stream.wipe.failed",
stream = %stream_name,
error = %e,
"Failed to purge stream during wipe_all operation"
);
}
}
}
Err(e) => {
warn!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.stream.info.failed",
error = %e,
"Failed to get stream info during wipe_all operation"
);
}
}
}
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.all.wipe.completed",
streams_purged = total_streams_purged,
messages_purged = total_messages_purged,
"Completed wipe_all operation - all JetStream data removed"
);
Ok(())
}
pub async fn delete_message(
backend: &JetStreamBackend,
stream_key: &str,
sequence: u64,
) -> Result<DeleteMessageResult> {
let stream_name = stream_key.to_ascii_uppercase();
let stream = match backend.jetstream.get_stream(&stream_name).await {
Ok(stream) => stream,
Err(error) => {
let message = error.to_string().to_ascii_lowercase();
if message.contains("stream not found") {
return Ok(DeleteMessageResult::NotFound);
}
return Err(error).with_context(|| format!("Failed to get stream {stream_name}"));
}
};
let deleted = stream.delete_message(sequence).await.with_context(|| {
format!("Failed to delete sequence {sequence} from stream {stream_name}")
})?;
if deleted {
info!(
service_name = SERVICE_NAME,
service_version = SERVICE_VERSION,
event_name = "admin.notification.delete.succeeded",
stream_name = %stream_name,
sequence,
"Deleted notification from JetStream stream"
);
Ok(DeleteMessageResult::Deleted)
} else {
Ok(DeleteMessageResult::NotFound)
}
}