use std::time::Duration;
#[path = "annotation_collection.rs"]
mod collection;
pub use collection::Annotations;
#[derive(Debug, Clone, PartialEq)]
pub enum ProtocolAnnotation {
TimedChoice {
duration: Duration,
},
Priority(u32),
Retry {
max_attempts: u32,
delay: Option<Duration>,
},
Idempotent,
Trace {
label: Option<String>,
},
RuntimeTimeout(Duration),
Heartbeat {
interval: Duration,
on_missing_count: u32,
},
Parallel,
Ordered,
MinResponses(u32),
Custom {
key: String,
value: String,
},
}
impl ProtocolAnnotation {
fn custom_kv(key: &str, value: &str) -> Self {
Self::Custom {
key: key.to_string(),
value: value.to_string(),
}
}
fn parse_u32_value(value: &str) -> Option<u32> {
value.parse::<u32>().ok()
}
fn parse_u64_value(value: &str) -> Option<u64> {
value.parse::<u64>().ok()
}
fn parse_duration_value(value: &str) -> Option<Duration> {
let trimmed = value.trim();
let (number, unit) = if let Some(number) = trimmed.strip_suffix("ms") {
(number, "ms")
} else if let Some(number) = trimmed.strip_suffix('s') {
(number, "s")
} else if let Some(number) = trimmed.strip_suffix('m') {
(number, "m")
} else if let Some(number) = trimmed.strip_suffix('h') {
(number, "h")
} else {
return Self::parse_u64_value(trimmed).map(Duration::from_millis);
};
let value = number.trim().parse::<u64>().ok()?;
let millis = match unit {
"ms" => value,
"s" => value.saturating_mul(1000),
"m" => value.saturating_mul(60_000),
"h" => value.saturating_mul(3_600_000),
_ => return None,
};
Some(Duration::from_millis(millis))
}
fn format_duration_value(duration: Duration) -> String {
let millis = duration.as_millis();
if millis % 3_600_000 == 0 {
format!("{}h", millis / 3_600_000)
} else if millis % 60_000 == 0 {
format!("{}m", millis / 60_000)
} else if millis % 1000 == 0 {
format!("{}s", millis / 1000)
} else {
format!("{millis}ms")
}
}
#[must_use]
pub fn timed_choice(duration: Duration) -> Self {
Self::TimedChoice { duration }
}
#[must_use]
pub fn timed_choice_ms(ms: u64) -> Self {
Self::TimedChoice {
duration: Duration::from_millis(ms),
}
}
#[must_use]
pub fn priority(value: u32) -> Self {
Self::Priority(value)
}
#[must_use]
pub fn retry(max_attempts: u32) -> Self {
Self::Retry {
max_attempts,
delay: None,
}
}
#[must_use]
pub fn retry_with_delay(max_attempts: u32, delay: Duration) -> Self {
Self::Retry {
max_attempts,
delay: Some(delay),
}
}
#[must_use]
pub fn trace() -> Self {
Self::Trace { label: None }
}
#[must_use]
pub fn trace_labeled(label: impl Into<String>) -> Self {
Self::Trace {
label: Some(label.into()),
}
}
#[must_use]
pub fn runtime_timeout(duration: Duration) -> Self {
Self::RuntimeTimeout(duration)
}
#[must_use]
pub fn heartbeat(interval: Duration, on_missing_count: u32) -> Self {
Self::Heartbeat {
interval,
on_missing_count,
}
}
#[must_use]
pub fn heartbeat_ms(interval_ms: u64, on_missing_count: u32) -> Self {
Self::Heartbeat {
interval: Duration::from_millis(interval_ms),
on_missing_count,
}
}
#[must_use]
pub fn parallel() -> Self {
Self::Parallel
}
#[must_use]
pub fn ordered() -> Self {
Self::Ordered
}
#[must_use]
pub fn min_responses(min: u32) -> Self {
Self::MinResponses(min)
}
#[must_use]
pub fn custom(key: impl Into<String>, value: impl Into<String>) -> Self {
Self::Custom {
key: key.into(),
value: value.into(),
}
}
#[must_use]
pub fn is_timed_choice(&self) -> bool {
matches!(self, Self::TimedChoice { .. })
}
#[must_use]
pub fn timed_choice_duration(&self) -> Option<Duration> {
match self {
Self::TimedChoice { duration } => Some(*duration),
_ => None,
}
}
#[must_use]
pub fn is_priority(&self) -> bool {
matches!(self, Self::Priority(_))
}
#[must_use]
pub fn priority_value(&self) -> Option<u32> {
match self {
Self::Priority(v) => Some(*v),
_ => None,
}
}
#[must_use]
pub fn is_retry(&self) -> bool {
matches!(self, Self::Retry { .. })
}
#[must_use]
pub fn retry_config(&self) -> Option<(u32, Option<Duration>)> {
match self {
Self::Retry {
max_attempts,
delay,
} => Some((*max_attempts, *delay)),
_ => None,
}
}
#[must_use]
pub fn is_idempotent(&self) -> bool {
matches!(self, Self::Idempotent)
}
#[must_use]
pub fn is_trace(&self) -> bool {
matches!(self, Self::Trace { .. })
}
#[must_use]
pub fn is_heartbeat(&self) -> bool {
matches!(self, Self::Heartbeat { .. })
}
#[must_use]
pub fn heartbeat_params(&self) -> Option<(Duration, u32)> {
match self {
Self::Heartbeat {
interval,
on_missing_count,
} => Some((*interval, *on_missing_count)),
_ => None,
}
}
#[must_use]
pub fn is_runtime_timeout(&self) -> bool {
matches!(self, Self::RuntimeTimeout(_))
}
#[must_use]
pub fn runtime_timeout_duration(&self) -> Option<Duration> {
match self {
Self::RuntimeTimeout(d) => Some(*d),
_ => None,
}
}
#[must_use]
pub fn is_parallel(&self) -> bool {
matches!(self, Self::Parallel)
}
#[must_use]
pub fn is_ordered(&self) -> bool {
matches!(self, Self::Ordered)
}
#[must_use]
pub fn is_min_responses(&self) -> bool {
matches!(self, Self::MinResponses(_))
}
#[must_use]
pub fn min_responses_value(&self) -> Option<u32> {
match self {
Self::MinResponses(n) => Some(*n),
_ => None,
}
}
#[must_use]
pub fn is_custom_key(&self, expected_key: &str) -> bool {
matches!(self, Self::Custom { key, .. } if key == expected_key)
}
#[must_use]
pub fn custom_value(&self, expected_key: &str) -> Option<&str> {
match self {
Self::Custom { key, value } if key == expected_key => Some(value),
_ => None,
}
}
pub(crate) fn parse_dsl_entry(key: &str, value: &str) -> Self {
match key {
"timed_choice" if value == "true" => {
Self::TimedChoice {
duration: Duration::from_secs(0),
}
}
"timeout_ms" => Self::parse_u64_value(value)
.map(|ms| Self::TimedChoice {
duration: Duration::from_millis(ms),
})
.unwrap_or_else(|| Self::custom_kv(key, value)),
"priority" => Self::parse_u32_value(value)
.map(Self::Priority)
.unwrap_or_else(|| Self::custom_kv(key, value)),
"retry" => Self::parse_u32_value(value)
.map(|max_attempts| Self::Retry {
max_attempts,
delay: None,
})
.unwrap_or_else(|| Self::custom_kv(key, value)),
"idempotent" if value == "true" => Self::Idempotent,
"trace" => Self::Trace {
label: if value.is_empty() || value == "true" {
None
} else {
Some(value.to_string())
},
},
"runtime_timeout" => Self::parse_duration_value(value)
.map(Self::RuntimeTimeout)
.unwrap_or_else(|| Self::custom_kv(key, value)),
"parallel" if value.is_empty() || value == "true" => Self::Parallel,
"ordered" if value.is_empty() || value == "true" => Self::Ordered,
"min_responses" => Self::parse_u32_value(value)
.map(Self::MinResponses)
.unwrap_or_else(|| Self::custom_kv(key, value)),
_ => Self::custom_kv(key, value),
}
}
pub(crate) fn dsl_entries(&self) -> Vec<(String, String)> {
match self {
Self::TimedChoice { duration } => vec![
("timed_choice".to_string(), "true".to_string()),
("timeout_ms".to_string(), duration.as_millis().to_string()),
],
Self::Priority(value) => vec![("priority".to_string(), value.to_string())],
Self::Retry { max_attempts, .. } => {
vec![("retry".to_string(), max_attempts.to_string())]
}
Self::Idempotent => vec![("idempotent".to_string(), "true".to_string())],
Self::Trace { label } => vec![(
"trace".to_string(),
label.clone().unwrap_or_else(|| "true".to_string()),
)],
Self::RuntimeTimeout(duration) => vec![(
"runtime_timeout".to_string(),
Self::format_duration_value(*duration),
)],
Self::Heartbeat {
interval,
on_missing_count,
} => vec![(
"heartbeat".to_string(),
format!(
"every {} on_missing {}",
Self::format_duration_value(*interval),
on_missing_count
),
)],
Self::Parallel => vec![("parallel".to_string(), "true".to_string())],
Self::Ordered => vec![("ordered".to_string(), "true".to_string())],
Self::MinResponses(value) => vec![("min_responses".to_string(), value.to_string())],
Self::Custom { key, value } => vec![(key.clone(), value.clone())],
}
}
}