use crate::error::ValidationError;
use crate::provider::ProviderType;
use bytes::Bytes;
use chrono::{DateTime, Duration, Utc};
use serde::{Deserialize, Serialize};
use std::collections::HashMap;
use std::str::FromStr;
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct QueueName(String);
impl QueueName {
pub fn new(name: String) -> Result<Self, ValidationError> {
if name.is_empty() || name.len() > 260 {
return Err(ValidationError::OutOfRange {
field: "queue_name".to_string(),
message: "must be 1-260 characters".to_string(),
});
}
if !name
.chars()
.all(|c| c.is_ascii_alphanumeric() || c == '-' || c == '_')
{
return Err(ValidationError::InvalidFormat {
field: "queue_name".to_string(),
message: "only ASCII alphanumeric, hyphens, and underscores allowed".to_string(),
});
}
if name.starts_with('-') || name.ends_with('-') || name.contains("--") {
return Err(ValidationError::InvalidFormat {
field: "queue_name".to_string(),
message: "no leading/trailing hyphens or consecutive hyphens".to_string(),
});
}
Ok(Self(name))
}
pub fn with_prefix(prefix: &str, base_name: &str) -> Result<Self, ValidationError> {
let full_name = format!("{}-{}", prefix, base_name);
Self::new(full_name)
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for QueueName {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for QueueName {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::new(s.to_string())
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct MessageId(String);
impl MessageId {
pub fn new() -> Self {
let id = uuid::Uuid::new_v4();
Self(id.to_string())
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl Default for MessageId {
fn default() -> Self {
Self::new()
}
}
impl std::fmt::Display for MessageId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for MessageId {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
if s.is_empty() {
return Err(ValidationError::Required {
field: "message_id".to_string(),
});
}
Ok(Self(s.to_string()))
}
}
#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
pub struct SessionId(String);
impl SessionId {
pub fn new(id: String) -> Result<Self, ValidationError> {
if id.is_empty() {
return Err(ValidationError::Required {
field: "session_id".to_string(),
});
}
if id.len() > 128 {
return Err(ValidationError::OutOfRange {
field: "session_id".to_string(),
message: "maximum 128 characters".to_string(),
});
}
if !id.chars().all(|c| c.is_ascii() && !c.is_ascii_control()) {
return Err(ValidationError::InvalidFormat {
field: "session_id".to_string(),
message: "only ASCII printable characters allowed".to_string(),
});
}
Ok(Self(id))
}
pub fn from_parts(owner: &str, repo: &str, entity_type: &str, entity_id: &str) -> Self {
let id = format!("{}/{}/{}/{}", owner, repo, entity_type, entity_id);
Self(id)
}
pub fn as_str(&self) -> &str {
&self.0
}
}
impl std::fmt::Display for SessionId {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0)
}
}
impl FromStr for SessionId {
type Err = ValidationError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
Self::new(s.to_string())
}
}
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Serialize, Deserialize)]
pub struct Timestamp(DateTime<Utc>);
impl Timestamp {
pub fn now() -> Self {
Self(Utc::now())
}
pub fn from_datetime(dt: DateTime<Utc>) -> Self {
Self(dt)
}
pub fn as_datetime(&self) -> DateTime<Utc> {
self.0
}
}
impl std::fmt::Display for Timestamp {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{}", self.0.format("%Y-%m-%d %H:%M:%S UTC"))
}
}
impl FromStr for Timestamp {
type Err = chrono::ParseError;
fn from_str(s: &str) -> Result<Self, Self::Err> {
let dt = s.parse::<DateTime<Utc>>()?;
Ok(Self::from_datetime(dt))
}
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Message {
#[serde(with = "bytes_serde")]
pub body: Bytes,
pub attributes: HashMap<String, String>,
pub session_id: Option<SessionId>,
pub correlation_id: Option<String>,
pub time_to_live: Option<Duration>,
}
mod bytes_serde {
use base64::{engine::general_purpose, Engine as _};
use bytes::Bytes;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
pub fn serialize<S>(bytes: &Bytes, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
let encoded = general_purpose::STANDARD.encode(bytes);
encoded.serialize(serializer)
}
pub fn deserialize<'de, D>(deserializer: D) -> Result<Bytes, D::Error>
where
D: Deserializer<'de>,
{
let encoded = String::deserialize(deserializer)?;
let decoded = general_purpose::STANDARD
.decode(encoded)
.map_err(serde::de::Error::custom)?;
Ok(Bytes::from(decoded))
}
}
impl Message {
pub fn new(body: Bytes) -> Self {
Self {
body,
attributes: HashMap::new(),
session_id: None,
correlation_id: None,
time_to_live: None,
}
}
pub fn with_session_id(mut self, session_id: SessionId) -> Self {
self.session_id = Some(session_id);
self
}
pub fn with_attribute(mut self, key: String, value: String) -> Self {
self.attributes.insert(key, value);
self
}
pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
self.correlation_id = Some(correlation_id);
self
}
pub fn with_time_to_live(mut self, ttl: Duration) -> Self {
self.time_to_live = Some(ttl);
self
}
}
#[derive(Debug, Clone)]
pub struct ReceivedMessage {
pub message_id: MessageId,
pub body: Bytes,
pub attributes: HashMap<String, String>,
pub session_id: Option<SessionId>,
pub correlation_id: Option<String>,
pub receipt_handle: ReceiptHandle,
pub delivery_count: u32,
pub first_delivered_at: Timestamp,
pub delivered_at: Timestamp,
}
impl ReceivedMessage {
pub fn message(&self) -> Message {
Message {
body: self.body.clone(),
attributes: self.attributes.clone(),
session_id: self.session_id.clone(),
correlation_id: self.correlation_id.clone(),
time_to_live: None, }
}
pub fn has_exceeded_max_delivery_count(&self, max_count: u32) -> bool {
self.delivery_count > max_count
}
}
#[derive(Debug, Clone, PartialEq, Eq)]
pub struct ReceiptHandle {
handle: String,
expires_at: Timestamp,
provider_type: ProviderType,
}
impl ReceiptHandle {
pub fn new(handle: String, expires_at: Timestamp, provider_type: ProviderType) -> Self {
Self {
handle,
expires_at,
provider_type,
}
}
pub fn handle(&self) -> &str {
&self.handle
}
pub fn is_expired(&self) -> bool {
Timestamp::now() >= self.expires_at
}
pub fn time_until_expiry(&self) -> Duration {
let now = Timestamp::now();
if now >= self.expires_at {
Duration::zero()
} else {
self.expires_at.as_datetime() - now.as_datetime()
}
}
pub fn provider_type(&self) -> ProviderType {
self.provider_type
}
}
#[derive(Debug, Clone, Default)]
pub struct SendOptions {
pub session_id: Option<SessionId>,
pub correlation_id: Option<String>,
pub scheduled_enqueue_time: Option<Timestamp>,
pub time_to_live: Option<Duration>,
pub properties: HashMap<String, String>,
pub content_type: Option<String>,
pub duplicate_detection_id: Option<String>,
}
impl SendOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_session_id(mut self, session_id: SessionId) -> Self {
self.session_id = Some(session_id);
self
}
pub fn with_correlation_id(mut self, correlation_id: String) -> Self {
self.correlation_id = Some(correlation_id);
self
}
pub fn with_scheduled_enqueue_time(mut self, time: Timestamp) -> Self {
self.scheduled_enqueue_time = Some(time);
self
}
pub fn with_delay(mut self, delay: Duration) -> Self {
let scheduled_time = Timestamp::from_datetime(Utc::now() + delay);
self.scheduled_enqueue_time = Some(scheduled_time);
self
}
pub fn with_time_to_live(mut self, ttl: Duration) -> Self {
self.time_to_live = Some(ttl);
self
}
pub fn with_property(mut self, key: String, value: String) -> Self {
self.properties.insert(key, value);
self
}
pub fn with_content_type(mut self, content_type: String) -> Self {
self.content_type = Some(content_type);
self
}
pub fn with_duplicate_detection_id(mut self, id: String) -> Self {
self.duplicate_detection_id = Some(id);
self
}
}
#[derive(Debug, Clone)]
pub struct ReceiveOptions {
pub max_messages: u32,
pub timeout: Duration,
pub session_id: Option<SessionId>,
pub accept_any_session: bool,
pub lock_duration: Option<Duration>,
pub peek_only: bool,
pub from_sequence_number: Option<u64>,
}
impl Default for ReceiveOptions {
fn default() -> Self {
Self {
max_messages: 1,
timeout: Duration::seconds(30),
session_id: None,
accept_any_session: false,
lock_duration: None,
peek_only: false,
from_sequence_number: None,
}
}
}
impl ReceiveOptions {
pub fn new() -> Self {
Self::default()
}
pub fn with_max_messages(mut self, max: u32) -> Self {
self.max_messages = max;
self
}
pub fn with_timeout(mut self, timeout: Duration) -> Self {
self.timeout = timeout;
self
}
pub fn with_session_id(mut self, session_id: SessionId) -> Self {
self.session_id = Some(session_id);
self.accept_any_session = false;
self
}
pub fn accept_any_session(mut self) -> Self {
self.accept_any_session = true;
self.session_id = None;
self
}
pub fn with_lock_duration(mut self, duration: Duration) -> Self {
self.lock_duration = Some(duration);
self
}
pub fn peek_only(mut self) -> Self {
self.peek_only = true;
self
}
pub fn from_sequence_number(mut self, sequence: u64) -> Self {
self.from_sequence_number = Some(sequence);
self
}
}
#[cfg(test)]
#[path = "message_tests.rs"]
mod tests;