1use std::time::Duration;
18
19#[path = "annotation_collection.rs"]
20mod collection;
21
22pub use collection::Annotations;
23
24#[derive(Debug, Clone, PartialEq)]
29pub enum ProtocolAnnotation {
30 TimedChoice {
36 duration: Duration,
38 },
39
40 Priority(u32),
44
45 Retry {
49 max_attempts: u32,
51 delay: Option<Duration>,
53 },
54
55 Idempotent,
57
58 Trace {
60 label: Option<String>,
62 },
63
64 RuntimeTimeout(Duration),
69
70 Heartbeat {
76 interval: Duration,
78 on_missing_count: u32,
80 },
81
82 Parallel,
87
88 Ordered,
94
95 MinResponses(u32),
101
102 Custom {
106 key: String,
108 value: String,
110 },
111}
112
113impl ProtocolAnnotation {
114 fn custom_kv(key: &str, value: &str) -> Self {
115 Self::Custom {
116 key: key.to_string(),
117 value: value.to_string(),
118 }
119 }
120
121 fn parse_u32_value(value: &str) -> Option<u32> {
122 value.parse::<u32>().ok()
123 }
124
125 fn parse_u64_value(value: &str) -> Option<u64> {
126 value.parse::<u64>().ok()
127 }
128
129 fn parse_duration_value(value: &str) -> Option<Duration> {
130 let trimmed = value.trim();
131 let (number, unit) = if let Some(number) = trimmed.strip_suffix("ms") {
132 (number, "ms")
133 } else if let Some(number) = trimmed.strip_suffix('s') {
134 (number, "s")
135 } else if let Some(number) = trimmed.strip_suffix('m') {
136 (number, "m")
137 } else if let Some(number) = trimmed.strip_suffix('h') {
138 (number, "h")
139 } else {
140 return Self::parse_u64_value(trimmed).map(Duration::from_millis);
141 };
142
143 let value = number.trim().parse::<u64>().ok()?;
144 let millis = match unit {
145 "ms" => value,
146 "s" => value.saturating_mul(1000),
147 "m" => value.saturating_mul(60_000),
148 "h" => value.saturating_mul(3_600_000),
149 _ => return None,
150 };
151 Some(Duration::from_millis(millis))
152 }
153
154 fn format_duration_value(duration: Duration) -> String {
155 let millis = duration.as_millis();
156 if millis % 3_600_000 == 0 {
157 format!("{}h", millis / 3_600_000)
158 } else if millis % 60_000 == 0 {
159 format!("{}m", millis / 60_000)
160 } else if millis % 1000 == 0 {
161 format!("{}s", millis / 1000)
162 } else {
163 format!("{millis}ms")
164 }
165 }
166
167 #[must_use]
169 pub fn timed_choice(duration: Duration) -> Self {
170 Self::TimedChoice { duration }
171 }
172
173 #[must_use]
175 pub fn timed_choice_ms(ms: u64) -> Self {
176 Self::TimedChoice {
177 duration: Duration::from_millis(ms),
178 }
179 }
180
181 #[must_use]
183 pub fn priority(value: u32) -> Self {
184 Self::Priority(value)
185 }
186
187 #[must_use]
189 pub fn retry(max_attempts: u32) -> Self {
190 Self::Retry {
191 max_attempts,
192 delay: None,
193 }
194 }
195
196 #[must_use]
198 pub fn retry_with_delay(max_attempts: u32, delay: Duration) -> Self {
199 Self::Retry {
200 max_attempts,
201 delay: Some(delay),
202 }
203 }
204
205 #[must_use]
207 pub fn trace() -> Self {
208 Self::Trace { label: None }
209 }
210
211 #[must_use]
213 pub fn trace_labeled(label: impl Into<String>) -> Self {
214 Self::Trace {
215 label: Some(label.into()),
216 }
217 }
218
219 #[must_use]
221 pub fn runtime_timeout(duration: Duration) -> Self {
222 Self::RuntimeTimeout(duration)
223 }
224
225 #[must_use]
227 pub fn heartbeat(interval: Duration, on_missing_count: u32) -> Self {
228 Self::Heartbeat {
229 interval,
230 on_missing_count,
231 }
232 }
233
234 #[must_use]
236 pub fn heartbeat_ms(interval_ms: u64, on_missing_count: u32) -> Self {
237 Self::Heartbeat {
238 interval: Duration::from_millis(interval_ms),
239 on_missing_count,
240 }
241 }
242
243 #[must_use]
245 pub fn parallel() -> Self {
246 Self::Parallel
247 }
248
249 #[must_use]
251 pub fn ordered() -> Self {
252 Self::Ordered
253 }
254
255 #[must_use]
257 pub fn min_responses(min: u32) -> Self {
258 Self::MinResponses(min)
259 }
260
261 #[must_use]
263 pub fn custom(key: impl Into<String>, value: impl Into<String>) -> Self {
264 Self::Custom {
265 key: key.into(),
266 value: value.into(),
267 }
268 }
269
270 #[must_use]
272 pub fn is_timed_choice(&self) -> bool {
273 matches!(self, Self::TimedChoice { .. })
274 }
275
276 #[must_use]
278 pub fn timed_choice_duration(&self) -> Option<Duration> {
279 match self {
280 Self::TimedChoice { duration } => Some(*duration),
281 _ => None,
282 }
283 }
284
285 #[must_use]
287 pub fn is_priority(&self) -> bool {
288 matches!(self, Self::Priority(_))
289 }
290
291 #[must_use]
293 pub fn priority_value(&self) -> Option<u32> {
294 match self {
295 Self::Priority(v) => Some(*v),
296 _ => None,
297 }
298 }
299
300 #[must_use]
302 pub fn is_retry(&self) -> bool {
303 matches!(self, Self::Retry { .. })
304 }
305
306 #[must_use]
308 pub fn retry_config(&self) -> Option<(u32, Option<Duration>)> {
309 match self {
310 Self::Retry {
311 max_attempts,
312 delay,
313 } => Some((*max_attempts, *delay)),
314 _ => None,
315 }
316 }
317
318 #[must_use]
320 pub fn is_idempotent(&self) -> bool {
321 matches!(self, Self::Idempotent)
322 }
323
324 #[must_use]
326 pub fn is_trace(&self) -> bool {
327 matches!(self, Self::Trace { .. })
328 }
329
330 #[must_use]
332 pub fn is_heartbeat(&self) -> bool {
333 matches!(self, Self::Heartbeat { .. })
334 }
335
336 #[must_use]
338 pub fn heartbeat_params(&self) -> Option<(Duration, u32)> {
339 match self {
340 Self::Heartbeat {
341 interval,
342 on_missing_count,
343 } => Some((*interval, *on_missing_count)),
344 _ => None,
345 }
346 }
347
348 #[must_use]
350 pub fn is_runtime_timeout(&self) -> bool {
351 matches!(self, Self::RuntimeTimeout(_))
352 }
353
354 #[must_use]
356 pub fn runtime_timeout_duration(&self) -> Option<Duration> {
357 match self {
358 Self::RuntimeTimeout(d) => Some(*d),
359 _ => None,
360 }
361 }
362
363 #[must_use]
365 pub fn is_parallel(&self) -> bool {
366 matches!(self, Self::Parallel)
367 }
368
369 #[must_use]
371 pub fn is_ordered(&self) -> bool {
372 matches!(self, Self::Ordered)
373 }
374
375 #[must_use]
377 pub fn is_min_responses(&self) -> bool {
378 matches!(self, Self::MinResponses(_))
379 }
380
381 #[must_use]
383 pub fn min_responses_value(&self) -> Option<u32> {
384 match self {
385 Self::MinResponses(n) => Some(*n),
386 _ => None,
387 }
388 }
389
390 #[must_use]
392 pub fn is_custom_key(&self, expected_key: &str) -> bool {
393 matches!(self, Self::Custom { key, .. } if key == expected_key)
394 }
395
396 #[must_use]
398 pub fn custom_value(&self, expected_key: &str) -> Option<&str> {
399 match self {
400 Self::Custom { key, value } if key == expected_key => Some(value),
401 _ => None,
402 }
403 }
404
405 pub(crate) fn parse_dsl_entry(key: &str, value: &str) -> Self {
406 match key {
407 "timed_choice" if value == "true" => {
408 Self::TimedChoice {
410 duration: Duration::from_secs(0),
411 }
412 }
413 "timeout_ms" => Self::parse_u64_value(value)
414 .map(|ms| Self::TimedChoice {
415 duration: Duration::from_millis(ms),
416 })
417 .unwrap_or_else(|| Self::custom_kv(key, value)),
418 "priority" => Self::parse_u32_value(value)
419 .map(Self::Priority)
420 .unwrap_or_else(|| Self::custom_kv(key, value)),
421 "retry" => Self::parse_u32_value(value)
422 .map(|max_attempts| Self::Retry {
423 max_attempts,
424 delay: None,
425 })
426 .unwrap_or_else(|| Self::custom_kv(key, value)),
427 "idempotent" if value == "true" => Self::Idempotent,
428 "trace" => Self::Trace {
429 label: if value.is_empty() || value == "true" {
430 None
431 } else {
432 Some(value.to_string())
433 },
434 },
435 "runtime_timeout" => Self::parse_duration_value(value)
436 .map(Self::RuntimeTimeout)
437 .unwrap_or_else(|| Self::custom_kv(key, value)),
438 "parallel" if value.is_empty() || value == "true" => Self::Parallel,
439 "ordered" if value.is_empty() || value == "true" => Self::Ordered,
440 "min_responses" => Self::parse_u32_value(value)
441 .map(Self::MinResponses)
442 .unwrap_or_else(|| Self::custom_kv(key, value)),
443 _ => Self::custom_kv(key, value),
444 }
445 }
446
447 pub(crate) fn dsl_entries(&self) -> Vec<(String, String)> {
448 match self {
449 Self::TimedChoice { duration } => vec![
450 ("timed_choice".to_string(), "true".to_string()),
451 ("timeout_ms".to_string(), duration.as_millis().to_string()),
452 ],
453 Self::Priority(value) => vec![("priority".to_string(), value.to_string())],
454 Self::Retry { max_attempts, .. } => {
455 vec![("retry".to_string(), max_attempts.to_string())]
456 }
457 Self::Idempotent => vec![("idempotent".to_string(), "true".to_string())],
458 Self::Trace { label } => vec![(
459 "trace".to_string(),
460 label.clone().unwrap_or_else(|| "true".to_string()),
461 )],
462 Self::RuntimeTimeout(duration) => vec![(
463 "runtime_timeout".to_string(),
464 Self::format_duration_value(*duration),
465 )],
466 Self::Heartbeat {
467 interval,
468 on_missing_count,
469 } => vec![(
470 "heartbeat".to_string(),
471 format!(
472 "every {} on_missing {}",
473 Self::format_duration_value(*interval),
474 on_missing_count
475 ),
476 )],
477 Self::Parallel => vec![("parallel".to_string(), "true".to_string())],
478 Self::Ordered => vec![("ordered".to_string(), "true".to_string())],
479 Self::MinResponses(value) => vec![("min_responses".to_string(), value.to_string())],
480 Self::Custom { key, value } => vec![(key.clone(), value.clone())],
481 }
482 }
483}