Skip to main content

kinetics_parser/params/
worker.rs

1use crate::environment::{parse_environment, Environment};
2use serde::{Deserialize, Serialize};
3use syn::{
4    parse::{Parse, ParseStream},
5    token, Ident, LitBool, LitInt, LitStr,
6};
7
8#[derive(Debug, Clone, Serialize, Deserialize)]
9pub struct Worker {
10    pub name: Option<String>,
11    pub concurrency: u32,
12    pub fifo: bool,
13    pub environment: Environment,
14    pub batch_size: Option<u32>,
15}
16
17impl Parse for Worker {
18    fn parse(input: ParseStream) -> syn::Result<Self> {
19        let mut name = None;
20        let mut concurrency = None;
21        let mut fifo = None;
22        let mut environment = None;
23        let mut batch_size = None;
24
25        while !input.is_empty() {
26            let ident_span = input.span();
27            let ident: Ident = input.parse()?;
28            input.parse::<token::Eq>()?;
29
30            match ident.to_string().as_str() {
31                "name" => {
32                    if name.is_some() {
33                        return Err(syn::Error::new(ident_span, "Duplicate attribute `name`"));
34                    }
35                    name = Some(input.parse::<LitStr>()?.value());
36                }
37                "environment" => {
38                    if environment.is_some() {
39                        return Err(syn::Error::new(
40                            ident_span,
41                            "Duplicate attribute `environment`",
42                        ));
43                    }
44                    environment = Some(parse_environment(input)?);
45                }
46                "concurrency" => {
47                    if concurrency.is_some() {
48                        return Err(syn::Error::new(
49                            ident_span,
50                            "Duplicate attribute `concurrency`",
51                        ));
52                    }
53                    concurrency = Some(input.parse::<LitInt>()?.base10_parse::<u32>()?);
54                }
55                "fifo" => {
56                    if fifo.is_some() {
57                        return Err(syn::Error::new(ident_span, "Duplicate attribute `fifo`"));
58                    }
59                    fifo = match input.parse::<LitBool>() {
60                        Ok(bool) => Some(bool.value),
61                        Err(_) => {
62                            return Err(input.error("expected boolean value for 'fifo'"));
63                        }
64                    };
65                }
66                "batch_size" => {
67                    if batch_size.is_some() {
68                        return Err(syn::Error::new(
69                            ident_span,
70                            "Duplicate attribute `batch_size`",
71                        ));
72                    }
73
74                    batch_size = Some(input.parse::<LitInt>()?.base10_parse::<u32>()?);
75                }
76                // Ignore unknown attributes
77                _ => {}
78            }
79
80            if !input.is_empty() {
81                input.parse::<token::Comma>()?;
82            }
83        }
84
85        let max_batch = if fifo == Some(true) { 10 } else { 100 };
86
87        // Use hardcoded default value for batch_size to pass validation
88        // Default batch_size value managed by backend
89        if !(1..=max_batch).contains(&batch_size.unwrap_or(1)) {
90            return Err(syn::Error::new(
91                input.span(),
92                "Batch size must be 1..10 for FIFO queues and 1..100 for standard ones",
93            ));
94        }
95
96        Ok(Self {
97            name,
98            concurrency: concurrency.unwrap_or(1),
99            fifo: fifo.unwrap_or_default(),
100            environment: environment.unwrap_or_default(),
101            batch_size,
102        })
103    }
104}