use std::time::Duration;
use async_nats::jetstream;
use bus_core::error::BusError;
use futures_util::StreamExt;
use tokio::task::JoinHandle;
use tracing::Instrument;
const TRACING_TARGET: &str = "bus_nats::jetstream_advisory";
pub const DEFAULT_ADVISORY_SUBJECT_FILTER: &str = "$JS.EVENT.ADVISORY.>";
#[derive(Debug, Clone)]
pub struct AdvisoryLogOptions {
pub subject_filter: String,
pub max_payload_bytes_log: usize,
}
impl Default for AdvisoryLogOptions {
fn default() -> Self {
Self {
subject_filter: DEFAULT_ADVISORY_SUBJECT_FILTER.to_string(),
max_payload_bytes_log: 256,
}
}
}
pub struct AdvisoryLoggerHandle {
task_handle: JoinHandle<()>,
}
impl Drop for AdvisoryLoggerHandle {
fn drop(&mut self) {
self.task_handle.abort();
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub(crate) struct ExtractedAdvisoryFields {
pub(crate) advisory_type: Option<String>,
pub(crate) stream: Option<String>,
pub(crate) consumer: Option<String>,
pub(crate) json_ok: bool,
pub(crate) payload_len: usize,
}
pub(crate) fn extract_advisory_fields(payload: &[u8]) -> ExtractedAdvisoryFields {
let payload_len = payload.len();
let Ok(value) = serde_json::from_slice::<serde_json::Value>(payload) else {
return ExtractedAdvisoryFields {
advisory_type: None,
stream: None,
consumer: None,
json_ok: false,
payload_len,
};
};
let Some(obj) = value.as_object() else {
return ExtractedAdvisoryFields {
advisory_type: None,
stream: None,
consumer: None,
json_ok: true,
payload_len,
};
};
let str_or_none = |key: &str| {
obj.get(key)
.and_then(|field_value| field_value.as_str())
.map(std::string::ToString::to_string)
};
ExtractedAdvisoryFields {
advisory_type: str_or_none("type"),
stream: str_or_none("stream"),
consumer: str_or_none("consumer"),
json_ok: true,
payload_len,
}
}
fn log_one_advisory(subject: &str, payload: &[u8], max_payload_bytes_log: usize) {
let extracted_fields = extract_advisory_fields(payload);
if !extracted_fields.json_ok {
tracing::warn!(
target: TRACING_TARGET,
subject = subject,
payload_len = extracted_fields.payload_len,
"jetstream advisory payload is not JSON"
);
if max_payload_bytes_log > 0 {
let preview_length = max_payload_bytes_log.min(payload.len());
let preview = String::from_utf8_lossy(&payload[..preview_length]);
tracing::debug!(
target: TRACING_TARGET,
subject = subject,
preview = %preview,
"jetstream advisory raw prefix (lossy UTF-8)"
);
}
return;
}
tracing::info!(
target: TRACING_TARGET,
subject = subject,
advisory_type = extracted_fields.advisory_type.as_deref(),
stream = extracted_fields.stream.as_deref(),
consumer = extracted_fields.consumer.as_deref(),
payload_len = extracted_fields.payload_len,
"jetstream advisory"
);
}
pub async fn spawn_jetstream_advisory_logger(
js: &jetstream::Context,
opts: AdvisoryLogOptions,
) -> Result<AdvisoryLoggerHandle, BusError> {
let client = js.client();
let subject_filter = opts.subject_filter.clone();
let max_payload_bytes_log = opts.max_payload_bytes_log;
let mut subscriber = client
.subscribe(subject_filter.clone())
.await
.map_err(|error| BusError::Nats(error.to_string()))?;
let cloned_client = client.clone();
let task_handle = tokio::spawn(
async move {
let mut backoff_duration = Duration::from_millis(200);
loop {
while let Some(message) = subscriber.next().await {
let subject = message.subject.as_str();
log_one_advisory(subject, &message.payload, max_payload_bytes_log);
}
tracing::error!(
target: TRACING_TARGET,
"jetstream advisory subscription stream ended; reconnecting after backoff"
);
tokio::time::sleep(backoff_duration).await;
backoff_duration = (backoff_duration * 2).min(Duration::from_secs(30));
loop {
match cloned_client.subscribe(subject_filter.clone()).await {
Ok(resubscribed_stream) => {
subscriber = resubscribed_stream;
backoff_duration = Duration::from_millis(200);
break;
}
Err(error) => {
tracing::error!(
target: TRACING_TARGET,
error = %error,
"jetstream advisory resubscribe failed; retrying after backoff"
);
tokio::time::sleep(backoff_duration).await;
backoff_duration = (backoff_duration * 2).min(Duration::from_secs(30));
}
}
}
}
}
.instrument(tracing::debug_span!(
target: TRACING_TARGET,
"jetstream_advisory_logger"
)),
);
Ok(AdvisoryLoggerHandle { task_handle })
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn extract_invalid_json() {
let payload_bytes = [0xff, 0xfe, 0xfd];
let extracted_fields = extract_advisory_fields(&payload_bytes);
assert!(!extracted_fields.json_ok);
assert_eq!(extracted_fields.payload_len, 3);
assert!(extracted_fields.advisory_type.is_none());
}
#[test]
fn extract_full_object() {
let payload_json = br#"{"type":"io.nats.jetstream.advisory.v1.max_deliver","stream":"EVENTS","consumer":"w1"}"#;
let extracted_fields = extract_advisory_fields(payload_json);
assert!(extracted_fields.json_ok);
assert_eq!(
extracted_fields.advisory_type.as_deref(),
Some("io.nats.jetstream.advisory.v1.max_deliver")
);
assert_eq!(extracted_fields.stream.as_deref(), Some("EVENTS"));
assert_eq!(extracted_fields.consumer.as_deref(), Some("w1"));
}
#[test]
fn extract_array_json_no_keys() {
let payload_json = br#"[1,2]"#;
let extracted_fields = extract_advisory_fields(payload_json);
assert!(extracted_fields.json_ok);
assert!(extracted_fields.advisory_type.is_none());
}
#[test]
fn extract_non_string_type_ignored() {
let payload_json = br#"{"type":1}"#;
let extracted_fields = extract_advisory_fields(payload_json);
assert!(extracted_fields.json_ok);
assert!(extracted_fields.advisory_type.is_none());
}
}