use crate::error::Result;
use serde::{Deserialize, Serialize};
use std::fmt;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SchemaSubject(String);
impl SchemaSubject {
pub fn new(value: String) -> Result<Self> {
Self::validate(&value)?;
Ok(Self(value))
}
pub(crate) fn new_unchecked(value: String) -> Self {
Self(value)
}
pub fn as_str(&self) -> &str {
&self.0
}
pub fn into_inner(self) -> String {
self.0
}
pub fn namespace(&self) -> Option<&str> {
self.0.split('.').next().filter(|_| self.0.contains('.'))
}
pub fn action(&self) -> Option<&str> {
self.0.rsplit('.').next().filter(|_| self.0.contains('.'))
}
pub fn is_in_namespace(&self, namespace: &str) -> bool {
self.namespace() == Some(namespace)
}
pub fn starts_with(&self, prefix: &str) -> bool {
self.0.starts_with(prefix)
}
pub fn matches_pattern(&self, pattern: &str) -> bool {
if pattern == "**" {
return true;
}
let subject_parts: Vec<&str> = self.0.split('.').collect();
let pattern_parts: Vec<&str> = pattern.split('.').collect();
if pattern.contains("**") {
let prefix: Vec<&str> = pattern_parts
.iter()
.take_while(|&&p| p != "**")
.copied()
.collect();
if subject_parts.len() < prefix.len() {
return false;
}
for (s, p) in subject_parts.iter().zip(prefix.iter()) {
if *p != "*" && *s != *p {
return false;
}
}
true
} else {
if subject_parts.len() != pattern_parts.len() {
return false;
}
for (s, p) in subject_parts.iter().zip(pattern_parts.iter()) {
if *p != "*" && *s != *p {
return false;
}
}
true
}
}
fn validate(value: &str) -> Result<()> {
if value.is_empty() {
return Err(crate::error::AllSourceError::InvalidInput(
"Schema subject cannot be empty".to_string(),
));
}
if value.len() > 256 {
return Err(crate::error::AllSourceError::InvalidInput(format!(
"Schema subject cannot exceed 256 characters, got {}",
value.len()
)));
}
if !value
.chars()
.all(|c| c.is_lowercase() || c.is_numeric() || c == '.' || c == '_' || c == '-')
{
return Err(crate::error::AllSourceError::InvalidInput(format!(
"Schema subject '{value}' must be lowercase with dots, underscores, or hyphens"
)));
}
if value.starts_with('.')
|| value.starts_with('-')
|| value.starts_with('_')
|| value.ends_with('.')
|| value.ends_with('-')
|| value.ends_with('_')
{
return Err(crate::error::AllSourceError::InvalidInput(format!(
"Schema subject '{value}' cannot start or end with special characters"
)));
}
if value.contains("..") {
return Err(crate::error::AllSourceError::InvalidInput(format!(
"Schema subject '{value}' cannot have consecutive dots"
)));
}
Ok(())
}
}
impl fmt::Display for SchemaSubject {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}", self.0)
}
}
impl TryFrom<&str> for SchemaSubject {
type Error = crate::error::AllSourceError;
fn try_from(value: &str) -> Result<Self> {
SchemaSubject::new(value.to_string())
}
}
impl TryFrom<String> for SchemaSubject {
type Error = crate::error::AllSourceError;
fn try_from(value: String) -> Result<Self> {
SchemaSubject::new(value)
}
}
impl AsRef<str> for SchemaSubject {
fn as_ref(&self) -> &str {
&self.0
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_create_valid_subjects() {
let subject = SchemaSubject::new("user.created".to_string());
assert!(subject.is_ok());
assert_eq!(subject.unwrap().as_str(), "user.created");
let subject = SchemaSubject::new("user.profile.updated".to_string());
assert!(subject.is_ok());
let subject = SchemaSubject::new("order_item.created".to_string());
assert!(subject.is_ok());
let subject = SchemaSubject::new("payment-processed".to_string());
assert!(subject.is_ok());
let subject = SchemaSubject::new("event.v2.updated".to_string());
assert!(subject.is_ok());
let subject = SchemaSubject::new("created".to_string());
assert!(subject.is_ok());
}
#[test]
fn test_reject_empty_subject() {
let result = SchemaSubject::new(String::new());
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("cannot be empty"));
}
}
#[test]
fn test_reject_too_long_subject() {
let long_subject = "a".repeat(257);
let result = SchemaSubject::new(long_subject);
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("cannot exceed 256 characters"));
}
}
#[test]
fn test_accept_max_length_subject() {
let max_subject = "a".repeat(256);
let result = SchemaSubject::new(max_subject);
assert!(result.is_ok());
}
#[test]
fn test_reject_uppercase() {
let result = SchemaSubject::new("User.Created".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("user.CREATED".to_string());
assert!(result.is_err());
}
#[test]
fn test_reject_invalid_characters() {
let result = SchemaSubject::new("user created".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("user:created".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("user@created".to_string());
assert!(result.is_err());
}
#[test]
fn test_reject_starting_with_special_char() {
let result = SchemaSubject::new(".user.created".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("-user.created".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("_user.created".to_string());
assert!(result.is_err());
}
#[test]
fn test_reject_ending_with_special_char() {
let result = SchemaSubject::new("user.created.".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("user.created-".to_string());
assert!(result.is_err());
let result = SchemaSubject::new("user.created_".to_string());
assert!(result.is_err());
}
#[test]
fn test_reject_consecutive_dots() {
let result = SchemaSubject::new("user..created".to_string());
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("consecutive dots"));
}
}
#[test]
fn test_namespace_extraction() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert_eq!(subject.namespace(), Some("user"));
let subject = SchemaSubject::new("user.profile.updated".to_string()).unwrap();
assert_eq!(subject.namespace(), Some("user"));
let subject = SchemaSubject::new("created".to_string()).unwrap();
assert_eq!(subject.namespace(), None);
}
#[test]
fn test_action_extraction() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert_eq!(subject.action(), Some("created"));
let subject = SchemaSubject::new("user.profile.updated".to_string()).unwrap();
assert_eq!(subject.action(), Some("updated"));
let subject = SchemaSubject::new("created".to_string()).unwrap();
assert_eq!(subject.action(), None);
}
#[test]
fn test_is_in_namespace() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert!(subject.is_in_namespace("user"));
assert!(!subject.is_in_namespace("order"));
}
#[test]
fn test_starts_with() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert!(subject.starts_with("user"));
assert!(subject.starts_with("user."));
assert!(!subject.starts_with("order"));
}
#[test]
fn test_matches_pattern_exact() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert!(subject.matches_pattern("user.created"));
assert!(!subject.matches_pattern("user.updated"));
}
#[test]
fn test_matches_pattern_wildcard() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert!(subject.matches_pattern("user.*"));
assert!(subject.matches_pattern("*.created"));
assert!(subject.matches_pattern("*.*"));
assert!(!subject.matches_pattern("order.*"));
}
#[test]
fn test_matches_pattern_double_wildcard() {
let subject = SchemaSubject::new("user.profile.updated".to_string()).unwrap();
assert!(subject.matches_pattern("**"));
assert!(subject.matches_pattern("user.**"));
assert!(subject.matches_pattern("user.profile.**"));
}
#[test]
fn test_display_trait() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
assert_eq!(format!("{subject}"), "user.created");
}
#[test]
fn test_try_from_str() {
let subject: Result<SchemaSubject> = "user.created".try_into();
assert!(subject.is_ok());
assert_eq!(subject.unwrap().as_str(), "user.created");
let invalid: Result<SchemaSubject> = "".try_into();
assert!(invalid.is_err());
}
#[test]
fn test_try_from_string() {
let subject: Result<SchemaSubject> = "order.placed".to_string().try_into();
assert!(subject.is_ok());
let invalid: Result<SchemaSubject> = String::new().try_into();
assert!(invalid.is_err());
}
#[test]
fn test_into_inner() {
let subject = SchemaSubject::new("test.subject".to_string()).unwrap();
let inner = subject.into_inner();
assert_eq!(inner, "test.subject");
}
#[test]
fn test_equality() {
let subject1 = SchemaSubject::new("user.created".to_string()).unwrap();
let subject2 = SchemaSubject::new("user.created".to_string()).unwrap();
let subject3 = SchemaSubject::new("order.placed".to_string()).unwrap();
assert_eq!(subject1, subject2);
assert_ne!(subject1, subject3);
}
#[test]
fn test_cloning() {
let subject1 = SchemaSubject::new("test.subject".to_string()).unwrap();
let subject2 = subject1.clone();
assert_eq!(subject1, subject2);
}
#[test]
fn test_hash_consistency() {
use std::collections::HashSet;
let subject1 = SchemaSubject::new("user.created".to_string()).unwrap();
let subject2 = SchemaSubject::new("user.created".to_string()).unwrap();
let mut set = HashSet::new();
set.insert(subject1);
assert!(set.contains(&subject2));
}
#[test]
fn test_serde_serialization() {
let subject = SchemaSubject::new("user.created".to_string()).unwrap();
let json = serde_json::to_string(&subject).unwrap();
assert_eq!(json, "\"user.created\"");
let deserialized: SchemaSubject = serde_json::from_str(&json).unwrap();
assert_eq!(deserialized, subject);
}
#[test]
fn test_as_ref() {
let subject = SchemaSubject::new("test.subject".to_string()).unwrap();
let str_ref: &str = subject.as_ref();
assert_eq!(str_ref, "test.subject");
}
#[test]
fn test_new_unchecked() {
let subject = SchemaSubject::new_unchecked("INVALID Subject!".to_string());
assert_eq!(subject.as_str(), "INVALID Subject!");
}
}