use super::*;
use chrono::Duration;
mod config_tests {
use super::*;
#[test]
fn test_default_config_values() {
let config = NatsConfig::default();
assert_eq!(config.url, "nats://localhost:4222");
assert_eq!(config.stream_prefix, "queue-runtime");
assert_eq!(config.max_deliver, Some(3));
assert_eq!(config.ack_wait, Duration::seconds(30));
assert!(config.enable_dead_letter);
assert_eq!(config.dead_letter_subject_prefix, Some("dlq".to_string()));
assert!(config.credentials_path.is_none());
}
#[test]
fn test_default_session_lock_duration() {
let config = NatsConfig::default();
assert_eq!(config.session_lock_duration, Duration::minutes(5));
}
#[test]
fn test_custom_config() {
let config = NatsConfig {
url: "nats://user:pass@nats.example.com:4222".to_string(),
stream_prefix: "myapp".to_string(),
max_deliver: Some(5),
ack_wait: Duration::seconds(60),
session_lock_duration: Duration::minutes(10),
enable_dead_letter: false,
dead_letter_subject_prefix: None,
credentials_path: Some("/etc/nats/app.creds".to_string()),
};
assert_eq!(config.stream_prefix, "myapp");
assert_eq!(config.max_deliver, Some(5));
assert!(!config.enable_dead_letter);
}
#[test]
fn test_provider_type() {
assert_eq!(ProviderType::Nats.max_message_size(), 1024 * 1024);
assert!(ProviderType::Nats.supports_batching());
assert_eq!(
ProviderType::Nats.supports_sessions(),
crate::provider::SessionSupport::Emulated
);
}
}
mod naming_tests {
use super::*;
fn default_config() -> NatsConfig {
NatsConfig::default()
}
#[test]
fn test_queue_subject_format() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let subject = queue_subject(&config, &queue);
assert_eq!(subject, "queue_runtime.my_queue");
}
#[test]
fn test_session_subject_format() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let session = SessionId::new("session-123".to_string()).unwrap();
let subject = session_subject(&config, &queue, &session);
assert_eq!(subject, "queue_runtime.my_queue.session.session_123");
}
#[test]
fn test_session_subject_sanitises_slashes() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let session = SessionId::new("owner/repo/pr/42".to_string()).unwrap();
let subject = session_subject(&config, &queue, &session);
assert!(!subject.contains('/'));
assert!(subject.contains("session."));
}
#[test]
fn test_stream_name_format() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let name = stream_name(&config, &queue);
assert_eq!(name, "queue_runtime-my_queue");
assert!(!name.contains('.'));
}
#[test]
fn test_dead_letter_subject_format() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let subject = dead_letter_subject(&config, &queue);
assert_eq!(subject, Some("dlq.my_queue".to_string()));
}
#[test]
fn test_dead_letter_subject_disabled() {
let config = NatsConfig {
enable_dead_letter: false,
..NatsConfig::default()
};
let queue = QueueName::new("my-queue".to_string()).unwrap();
let subject = dead_letter_subject(&config, &queue);
assert!(subject.is_none());
}
#[test]
fn test_dead_letter_subject_no_prefix() {
let config = NatsConfig {
enable_dead_letter: true,
dead_letter_subject_prefix: None,
..NatsConfig::default()
};
let queue = QueueName::new("my-queue".to_string()).unwrap();
let subject = dead_letter_subject(&config, &queue);
assert!(subject.is_none());
}
#[test]
fn test_nats_safe_transformation() {
assert_eq!(nats_safe("my-name"), "my_name");
assert_eq!(nats_safe("my name"), "my_name");
assert_eq!(nats_safe("already_safe"), "already_safe");
assert_eq!(nats_safe("multi-word-name"), "multi_word_name");
}
#[test]
fn test_different_queues_different_streams() {
let config = default_config();
let q1 = QueueName::new("queue-one".to_string()).unwrap();
let q2 = QueueName::new("queue-two".to_string()).unwrap();
assert_ne!(stream_name(&config, &q1), stream_name(&config, &q2));
}
#[test]
fn test_different_sessions_different_subjects() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let s1 = SessionId::new("session-1".to_string()).unwrap();
let s2 = SessionId::new("session-2".to_string()).unwrap();
assert_ne!(
session_subject(&config, &queue, &s1),
session_subject(&config, &queue, &s2)
);
}
#[test]
fn test_session_consumer_name_format() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let session = SessionId::new("session-123".to_string()).unwrap();
let name = session_consumer_name(&config, &queue, &session);
assert_eq!(name, "queue_runtime-my_queue-session-session_123-consumer");
assert!(!name.contains('.'));
}
#[test]
fn test_session_consumer_name_distinct_from_queue_consumer() {
let config = default_config();
let queue = QueueName::new("q".to_string()).unwrap();
let session = SessionId::new("s".to_string()).unwrap();
assert_ne!(
consumer_name(&config, &queue),
session_consumer_name(&config, &queue, &session),
"session and queue consumer names must be distinct on the same queue"
);
}
#[test]
fn test_different_sessions_different_consumer_names() {
let config = default_config();
let queue = QueueName::new("my-queue".to_string()).unwrap();
let s1 = SessionId::new("session-1".to_string()).unwrap();
let s2 = SessionId::new("session-2".to_string()).unwrap();
assert_ne!(
session_consumer_name(&config, &queue, &s1),
session_consumer_name(&config, &queue, &s2),
"different sessions must produce distinct consumer names"
);
}
#[test]
fn test_session_consumer_name_sanitises_dots_and_special_chars() {
let config = default_config();
let queue = QueueName::new("q".to_string()).unwrap();
let session = SessionId::new("org.name/repo.name".to_string()).unwrap();
let name = session_consumer_name(&config, &queue, &session);
assert!(
!name.contains('.'),
"dots must be replaced in consumer name"
);
assert!(
!name.contains('/'),
"slashes must be replaced in consumer name"
);
}
}
mod header_tests {
use super::*;
#[test]
fn test_attribute_roundtrip() {
let mut message = Message::new(bytes::Bytes::from("test body"));
message = message.with_attribute("color".to_string(), "blue".to_string());
message = message.with_attribute("size".to_string(), "large".to_string());
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let attrs = NatsProvider::extract_attributes(&opt_headers);
assert_eq!(attrs.get("color").map(String::as_str), Some("blue"));
assert_eq!(attrs.get("size").map(String::as_str), Some("large"));
}
#[test]
fn test_session_id_header_roundtrip() {
let session = SessionId::new("test-session".to_string()).unwrap();
let message = Message::new(bytes::Bytes::from("body")).with_session_id(session.clone());
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let extracted = NatsProvider::extract_session_id(&opt_headers);
assert_eq!(extracted, Some(session));
}
#[test]
fn test_no_session_id_when_none() {
let message = Message::new(bytes::Bytes::from("body"));
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let extracted = NatsProvider::extract_session_id(&opt_headers);
assert!(extracted.is_none());
}
#[test]
fn test_correlation_id_header_roundtrip() {
let message =
Message::new(bytes::Bytes::from("body")).with_correlation_id("corr-xyz".to_string());
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let extracted = NatsProvider::extract_correlation_id(&opt_headers);
assert_eq!(extracted, Some("corr-xyz".to_string()));
}
#[test]
fn test_no_correlation_id_when_none() {
let message = Message::new(bytes::Bytes::from("body"));
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let extracted = NatsProvider::extract_correlation_id(&opt_headers);
assert!(extracted.is_none());
}
#[test]
fn test_provider_headers_not_in_attributes() {
let session = SessionId::new("s1".to_string()).unwrap();
let message = Message::new(bytes::Bytes::from("body"))
.with_session_id(session)
.with_correlation_id("corr-1".to_string())
.with_attribute("my-key".to_string(), "my-value".to_string());
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let attrs = NatsProvider::extract_attributes(&opt_headers);
assert!(!attrs.contains_key("x-session-id"));
assert!(!attrs.contains_key("x-correlation-id"));
assert_eq!(attrs.get("my-key").map(String::as_str), Some("my-value"));
}
#[test]
fn test_extract_from_none_headers() {
let attrs = NatsProvider::extract_attributes(&None);
assert!(attrs.is_empty());
let session = NatsProvider::extract_session_id(&None);
assert!(session.is_none());
let corr = NatsProvider::extract_correlation_id(&None);
assert!(corr.is_none());
}
}
mod error_tests {
use super::*;
#[test]
fn test_error_conversion() {
let err = NatsError::new("connection refused");
let queue_err = err.to_queue_error();
match queue_err {
QueueError::ProviderError {
provider,
code,
message,
} => {
assert_eq!(provider, "nats");
assert_eq!(code, "NATS_ERROR");
assert!(message.contains("connection refused"));
}
other => panic!("unexpected error variant: {:?}", other),
}
}
#[test]
fn test_error_display() {
let err = NatsError::new("test error");
let display = format!("{}", err);
assert!(display.contains("test error"));
}
}
mod additional_tests {
use super::*;
#[test]
fn test_nats_safe_no_change() {
assert_eq!(nats_safe("already_safe_123"), "already_safe_123");
}
#[test]
fn test_nats_safe_replaces_hyphens_and_spaces() {
assert_eq!(nats_safe("my-queue name"), "my_queue_name");
}
#[test]
fn test_queue_subject_normalises_special_chars() {
let config = NatsConfig {
stream_prefix: "my-app".to_string(),
..NatsConfig::default()
};
let queue = QueueName::new("my-queue".to_string()).unwrap();
let subject = queue_subject(&config, &queue);
assert_eq!(subject, "my_app.my_queue");
}
#[test]
fn test_stream_name_has_no_dots() {
let config = NatsConfig::default();
let queue = QueueName::new("test-queue".to_string()).unwrap();
let name = stream_name(&config, &queue);
assert!(!name.contains('.'), "Stream names must not contain dots");
}
#[test]
fn test_dead_letter_subject_custom_prefix() {
let config = NatsConfig {
dead_letter_subject_prefix: Some("my-dlq".to_string()),
..NatsConfig::default()
};
let queue = QueueName::new("my-queue".to_string()).unwrap();
let subject = dead_letter_subject(&config, &queue);
assert_eq!(subject, Some("my_dlq.my_queue".to_string()));
}
#[test]
fn test_build_headers_empty_message() {
let message = Message::new(bytes::Bytes::from("body"));
let headers = NatsProvider::build_headers(&message);
assert!(headers.get("x-session-id").is_none());
assert!(headers.get("x-correlation-id").is_none());
}
#[test]
fn test_multiple_attributes_roundtrip() {
let mut message = Message::new(bytes::Bytes::from("body"));
for i in 0..5 {
message = message.with_attribute(format!("key-{}", i), format!("val-{}", i));
}
let headers = NatsProvider::build_headers(&message);
let opt_headers = Some(headers);
let attrs = NatsProvider::extract_attributes(&opt_headers);
for i in 0..5 {
assert_eq!(
attrs.get(&format!("key-{}", i)).map(String::as_str),
Some(format!("val-{}", i).as_str())
);
}
}
#[test]
fn test_config_no_credentials() {
let config = NatsConfig::default();
assert!(config.credentials_path.is_none());
}
#[test]
fn test_config_unlimited_deliver() {
let config = NatsConfig {
max_deliver: None,
..NatsConfig::default()
};
assert!(config.max_deliver.is_none());
}
#[test]
fn test_session_lock_duration_default() {
let config = NatsConfig::default();
assert_eq!(config.session_lock_duration, Duration::minutes(5));
}
#[test]
fn test_session_lock_duration_custom() {
let config = NatsConfig {
session_lock_duration: Duration::minutes(20),
..NatsConfig::default()
};
assert_eq!(config.session_lock_duration, Duration::minutes(20));
}
}
mod url_redaction_tests {
use super::*;
#[test]
fn test_redact_url_with_credentials() {
let redacted = redact_url("nats://user:pass@localhost:4222");
assert!(
!redacted.contains("user"),
"Username must be redacted: {redacted}"
);
assert!(
!redacted.contains("pass"),
"Password must be redacted: {redacted}"
);
assert!(
redacted.contains("localhost"),
"Host must be preserved: {redacted}"
);
assert!(
redacted.contains("4222"),
"Port must be preserved: {redacted}"
);
assert!(
redacted.contains("***"),
"Redaction marker must appear: {redacted}"
);
}
#[test]
fn test_redact_url_without_credentials() {
let url = "nats://localhost:4222";
let redacted = redact_url(url);
assert_eq!(redacted.trim_end_matches('/'), url.trim_end_matches('/'));
}
#[test]
fn test_redact_url_invalid_url() {
let redacted = redact_url("not a url !!!");
assert_eq!(redacted, "<invalid-url>");
}
#[test]
fn test_redact_url_username_only() {
let redacted = redact_url("nats://mytoken@nats.example.com:4222");
assert!(
!redacted.contains("mytoken"),
"Token must be redacted: {redacted}"
);
assert!(
redacted.contains("nats.example.com"),
"Host must be preserved: {redacted}"
);
}
}
mod session_lock_tests {
use super::*;
use std::sync::Mutex;
fn lock_expiring_in(minutes: i64) -> Mutex<Timestamp> {
let ts =
Timestamp::from_datetime(Timestamp::now().as_datetime() + Duration::minutes(minutes));
Mutex::new(ts)
}
fn lock_expired_since(minutes: i64) -> Mutex<Timestamp> {
let ts =
Timestamp::from_datetime(Timestamp::now().as_datetime() - Duration::minutes(minutes));
Mutex::new(ts)
}
#[test]
fn test_check_session_lock_valid() {
let lock = lock_expiring_in(5);
let session = SessionId::new("test-session".to_string()).unwrap();
assert!(check_session_lock(&lock, &session).is_ok());
}
#[test]
fn test_check_session_lock_expired() {
let lock = lock_expired_since(1);
let session = SessionId::new("expired-session".to_string()).unwrap();
let result = check_session_lock(&lock, &session);
assert!(result.is_err());
match result.unwrap_err() {
QueueError::SessionLocked { session_id, .. } => {
assert_eq!(session_id, "expired-session");
}
other => panic!("expected SessionLocked, got: {:?}", other),
}
}
#[test]
fn test_advance_session_lock_from_expired() {
let lock = lock_expired_since(1); let session = SessionId::new("test-session".to_string()).unwrap();
assert!(check_session_lock(&lock, &session).is_err());
let new_expiry = advance_session_lock(&lock, Duration::minutes(5))
.expect("advance_session_lock should succeed");
assert!(
new_expiry.as_datetime() > Timestamp::now().as_datetime(),
"new expiry must be in the future"
);
assert!(check_session_lock(&lock, &session).is_ok());
}
#[test]
fn test_advance_session_lock_extends_valid_lock() {
let lock = lock_expiring_in(1);
let before = { *lock.lock().unwrap() };
advance_session_lock(&lock, Duration::minutes(5))
.expect("advance_session_lock should succeed");
let after = { *lock.lock().unwrap() };
assert!(
after.as_datetime() > before.as_datetime(),
"renewal must extend the expiry"
);
}
}