1use serde::{Deserialize, Serialize};
18use std::time::Duration;
19
20#[path = "annotation_collection.rs"]
21mod collection;
22
23pub use collection::Annotations;
24
25#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
30pub struct DslAnnotationEntry {
31 pub key: String,
33 pub value: String,
35}
36
37impl DslAnnotationEntry {
38 #[must_use]
40 pub fn new(key: impl Into<String>, value: impl Into<String>) -> Self {
41 Self {
42 key: key.into(),
43 value: value.into(),
44 }
45 }
46}
47
48#[derive(Debug, Clone, PartialEq)]
53pub enum ProtocolAnnotation {
54 TimedChoice {
60 duration: Duration,
62 },
63
64 Priority(u32),
68
69 Retry {
73 max_attempts: u32,
75 delay: Option<Duration>,
77 },
78
79 Idempotent,
81
82 Trace {
84 label: Option<String>,
86 },
87
88 RuntimeTimeout(Duration),
93
94 Heartbeat {
100 interval: Duration,
102 on_missing_count: u32,
104 },
105
106 Parallel,
111
112 Ordered,
118
119 MinResponses(u32),
125
126 Custom {
130 key: String,
132 value: String,
134 },
135}
136
137impl ProtocolAnnotation {
138 fn custom_kv(key: &str, value: &str) -> Self {
139 Self::Custom {
140 key: key.to_string(),
141 value: value.to_string(),
142 }
143 }
144
145 fn parse_u32_value(value: &str) -> Option<u32> {
146 value.parse::<u32>().ok()
147 }
148
149 fn parse_u64_value(value: &str) -> Option<u64> {
150 value.parse::<u64>().ok()
151 }
152
153 fn parse_duration_value(value: &str) -> Option<Duration> {
154 let trimmed = value.trim();
155 let (number, unit) = if let Some(number) = trimmed.strip_suffix("ms") {
156 (number, "ms")
157 } else if let Some(number) = trimmed.strip_suffix('s') {
158 (number, "s")
159 } else if let Some(number) = trimmed.strip_suffix('m') {
160 (number, "m")
161 } else if let Some(number) = trimmed.strip_suffix('h') {
162 (number, "h")
163 } else {
164 return Self::parse_u64_value(trimmed).map(Duration::from_millis);
165 };
166
167 let value = number.trim().parse::<u64>().ok()?;
168 let millis = match unit {
169 "ms" => value,
170 "s" => value.saturating_mul(1000),
171 "m" => value.saturating_mul(60_000),
172 "h" => value.saturating_mul(3_600_000),
173 _ => return None,
174 };
175 Some(Duration::from_millis(millis))
176 }
177
178 fn format_duration_value(duration: Duration) -> String {
179 let millis = duration.as_millis();
180 if millis % 3_600_000 == 0 {
181 format!("{}h", millis / 3_600_000)
182 } else if millis % 60_000 == 0 {
183 format!("{}m", millis / 60_000)
184 } else if millis % 1000 == 0 {
185 format!("{}s", millis / 1000)
186 } else {
187 format!("{millis}ms")
188 }
189 }
190
191 #[must_use]
193 pub fn timed_choice(duration: Duration) -> Self {
194 Self::TimedChoice { duration }
195 }
196
197 #[must_use]
199 pub fn timed_choice_ms(ms: u64) -> Self {
200 Self::TimedChoice {
201 duration: Duration::from_millis(ms),
202 }
203 }
204
205 #[must_use]
207 pub fn priority(value: u32) -> Self {
208 Self::Priority(value)
209 }
210
211 #[must_use]
213 pub fn retry(max_attempts: u32) -> Self {
214 Self::Retry {
215 max_attempts,
216 delay: None,
217 }
218 }
219
220 #[must_use]
222 pub fn retry_with_delay(max_attempts: u32, delay: Duration) -> Self {
223 Self::Retry {
224 max_attempts,
225 delay: Some(delay),
226 }
227 }
228
229 #[must_use]
231 pub fn trace() -> Self {
232 Self::Trace { label: None }
233 }
234
235 #[must_use]
237 pub fn trace_labeled(label: impl Into<String>) -> Self {
238 Self::Trace {
239 label: Some(label.into()),
240 }
241 }
242
243 #[must_use]
245 pub fn runtime_timeout(duration: Duration) -> Self {
246 Self::RuntimeTimeout(duration)
247 }
248
249 #[must_use]
251 pub fn heartbeat(interval: Duration, on_missing_count: u32) -> Self {
252 Self::Heartbeat {
253 interval,
254 on_missing_count,
255 }
256 }
257
258 #[must_use]
260 pub fn heartbeat_ms(interval_ms: u64, on_missing_count: u32) -> Self {
261 Self::Heartbeat {
262 interval: Duration::from_millis(interval_ms),
263 on_missing_count,
264 }
265 }
266
267 #[must_use]
269 pub fn parallel() -> Self {
270 Self::Parallel
271 }
272
273 #[must_use]
275 pub fn ordered() -> Self {
276 Self::Ordered
277 }
278
279 #[must_use]
281 pub fn min_responses(min: u32) -> Self {
282 Self::MinResponses(min)
283 }
284
285 #[must_use]
287 pub fn custom(key: impl Into<String>, value: impl Into<String>) -> Self {
288 Self::Custom {
289 key: key.into(),
290 value: value.into(),
291 }
292 }
293
294 #[must_use]
296 pub fn is_timed_choice(&self) -> bool {
297 matches!(self, Self::TimedChoice { .. })
298 }
299
300 #[must_use]
302 pub fn timed_choice_duration(&self) -> Option<Duration> {
303 match self {
304 Self::TimedChoice { duration } => Some(*duration),
305 _ => None,
306 }
307 }
308
309 #[must_use]
311 pub fn is_priority(&self) -> bool {
312 matches!(self, Self::Priority(_))
313 }
314
315 #[must_use]
317 pub fn priority_value(&self) -> Option<u32> {
318 match self {
319 Self::Priority(v) => Some(*v),
320 _ => None,
321 }
322 }
323
324 #[must_use]
326 pub fn is_retry(&self) -> bool {
327 matches!(self, Self::Retry { .. })
328 }
329
330 #[must_use]
332 pub fn retry_config(&self) -> Option<(u32, Option<Duration>)> {
333 match self {
334 Self::Retry {
335 max_attempts,
336 delay,
337 } => Some((*max_attempts, *delay)),
338 _ => None,
339 }
340 }
341
342 #[must_use]
344 pub fn is_idempotent(&self) -> bool {
345 matches!(self, Self::Idempotent)
346 }
347
348 #[must_use]
350 pub fn is_trace(&self) -> bool {
351 matches!(self, Self::Trace { .. })
352 }
353
354 #[must_use]
356 pub fn is_heartbeat(&self) -> bool {
357 matches!(self, Self::Heartbeat { .. })
358 }
359
360 #[must_use]
362 pub fn heartbeat_params(&self) -> Option<(Duration, u32)> {
363 match self {
364 Self::Heartbeat {
365 interval,
366 on_missing_count,
367 } => Some((*interval, *on_missing_count)),
368 _ => None,
369 }
370 }
371
372 #[must_use]
374 pub fn is_runtime_timeout(&self) -> bool {
375 matches!(self, Self::RuntimeTimeout(_))
376 }
377
378 #[must_use]
380 pub fn runtime_timeout_duration(&self) -> Option<Duration> {
381 match self {
382 Self::RuntimeTimeout(d) => Some(*d),
383 _ => None,
384 }
385 }
386
387 #[must_use]
389 pub fn is_parallel(&self) -> bool {
390 matches!(self, Self::Parallel)
391 }
392
393 #[must_use]
395 pub fn is_ordered(&self) -> bool {
396 matches!(self, Self::Ordered)
397 }
398
399 #[must_use]
401 pub fn is_min_responses(&self) -> bool {
402 matches!(self, Self::MinResponses(_))
403 }
404
405 #[must_use]
407 pub fn min_responses_value(&self) -> Option<u32> {
408 match self {
409 Self::MinResponses(n) => Some(*n),
410 _ => None,
411 }
412 }
413
414 #[must_use]
416 pub fn is_custom_key(&self, expected_key: &str) -> bool {
417 matches!(self, Self::Custom { key, .. } if key == expected_key)
418 }
419
420 #[must_use]
422 pub fn custom_value(&self, expected_key: &str) -> Option<&str> {
423 match self {
424 Self::Custom { key, value } if key == expected_key => Some(value),
425 _ => None,
426 }
427 }
428
429 pub(crate) fn parse_dsl_entry(key: &str, value: &str) -> Self {
430 match key {
431 "timed_choice" if value == "true" => {
432 Self::TimedChoice {
434 duration: Duration::from_secs(0),
435 }
436 }
437 "timeout_ms" => Self::parse_u64_value(value)
438 .map(|ms| Self::TimedChoice {
439 duration: Duration::from_millis(ms),
440 })
441 .unwrap_or_else(|| Self::custom_kv(key, value)),
442 "priority" => Self::parse_u32_value(value)
443 .map(Self::Priority)
444 .unwrap_or_else(|| Self::custom_kv(key, value)),
445 "retry" => Self::parse_u32_value(value)
446 .map(|max_attempts| Self::Retry {
447 max_attempts,
448 delay: None,
449 })
450 .unwrap_or_else(|| Self::custom_kv(key, value)),
451 "idempotent" if value == "true" => Self::Idempotent,
452 "trace" => Self::Trace {
453 label: if value.is_empty() || value == "true" {
454 None
455 } else {
456 Some(value.to_string())
457 },
458 },
459 "runtime_timeout" => Self::parse_duration_value(value)
460 .map(Self::RuntimeTimeout)
461 .unwrap_or_else(|| Self::custom_kv(key, value)),
462 "parallel" if value.is_empty() || value == "true" => Self::Parallel,
463 "ordered" if value.is_empty() || value == "true" => Self::Ordered,
464 "min_responses" => Self::parse_u32_value(value)
465 .map(Self::MinResponses)
466 .unwrap_or_else(|| Self::custom_kv(key, value)),
467 _ => Self::custom_kv(key, value),
468 }
469 }
470
471 pub(crate) fn dsl_entries(&self) -> Vec<(String, String)> {
472 match self {
473 Self::TimedChoice { duration } => vec![
474 ("timed_choice".to_string(), "true".to_string()),
475 ("timeout_ms".to_string(), duration.as_millis().to_string()),
476 ],
477 Self::Priority(value) => vec![("priority".to_string(), value.to_string())],
478 Self::Retry { max_attempts, .. } => {
479 vec![("retry".to_string(), max_attempts.to_string())]
480 }
481 Self::Idempotent => vec![("idempotent".to_string(), "true".to_string())],
482 Self::Trace { label } => vec![(
483 "trace".to_string(),
484 label.clone().unwrap_or_else(|| "true".to_string()),
485 )],
486 Self::RuntimeTimeout(duration) => vec![(
487 "runtime_timeout".to_string(),
488 Self::format_duration_value(*duration),
489 )],
490 Self::Heartbeat {
491 interval,
492 on_missing_count,
493 } => vec![(
494 "heartbeat".to_string(),
495 format!(
496 "every {} on_missing {}",
497 Self::format_duration_value(*interval),
498 on_missing_count
499 ),
500 )],
501 Self::Parallel => vec![("parallel".to_string(), "true".to_string())],
502 Self::Ordered => vec![("ordered".to_string(), "true".to_string())],
503 Self::MinResponses(value) => vec![("min_responses".to_string(), value.to_string())],
504 Self::Custom { key, value } => vec![(key.clone(), value.clone())],
505 }
506 }
507}