drasi_reaction_http_adaptive/
lib.rs1#![allow(unexpected_cfgs)]
2mod adaptive_batcher;
36pub mod config;
37pub mod descriptor;
38pub mod http_adaptive;
39
40pub use config::HttpAdaptiveReactionConfig;
41pub use http_adaptive::AdaptiveHttpReaction;
42
43use drasi_lib::reactions::common::AdaptiveBatchConfig;
44use drasi_reaction_http::QueryConfig;
45use std::collections::HashMap;
46
47pub struct HttpAdaptiveReactionBuilder {
51 id: String,
52 queries: Vec<String>,
53 base_url: String,
54 token: Option<String>,
55 timeout_ms: u64,
56 routes: HashMap<String, QueryConfig>,
57 adaptive: AdaptiveBatchConfig,
58 priority_queue_capacity: Option<usize>,
59 auto_start: bool,
60}
61
62impl HttpAdaptiveReactionBuilder {
63 pub fn new(id: impl Into<String>) -> Self {
65 Self {
66 id: id.into(),
67 queries: Vec::new(),
68 base_url: "http://localhost".to_string(),
69 token: None,
70 timeout_ms: 5000,
71 routes: HashMap::new(),
72 adaptive: AdaptiveBatchConfig::default(),
73 priority_queue_capacity: None,
74 auto_start: true,
75 }
76 }
77
78 pub fn with_queries(mut self, queries: Vec<String>) -> Self {
80 self.queries = queries;
81 self
82 }
83
84 pub fn with_query(mut self, query_id: impl Into<String>) -> Self {
86 self.queries.push(query_id.into());
87 self
88 }
89
90 pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
92 self.base_url = base_url.into();
93 self
94 }
95
96 pub fn with_token(mut self, token: impl Into<String>) -> Self {
98 self.token = Some(token.into());
99 self
100 }
101
102 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
104 self.timeout_ms = timeout_ms;
105 self
106 }
107
108 pub fn with_route(mut self, query_id: impl Into<String>, config: QueryConfig) -> Self {
110 self.routes.insert(query_id.into(), config);
111 self
112 }
113
114 pub fn with_min_batch_size(mut self, size: usize) -> Self {
116 self.adaptive.adaptive_min_batch_size = size;
117 self
118 }
119
120 pub fn with_max_batch_size(mut self, size: usize) -> Self {
122 self.adaptive.adaptive_max_batch_size = size;
123 self
124 }
125
126 pub fn with_window_size(mut self, size: usize) -> Self {
128 self.adaptive.adaptive_window_size = size;
129 self
130 }
131
132 pub fn with_batch_timeout_ms(mut self, timeout_ms: u64) -> Self {
134 self.adaptive.adaptive_batch_timeout_ms = timeout_ms;
135 self
136 }
137
138 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
140 self.priority_queue_capacity = Some(capacity);
141 self
142 }
143
144 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
146 self.auto_start = auto_start;
147 self
148 }
149
150 pub fn with_config(mut self, config: HttpAdaptiveReactionConfig) -> Self {
152 self.base_url = config.base_url;
153 self.token = config.token;
154 self.timeout_ms = config.timeout_ms;
155 self.routes = config.routes;
156 self.adaptive = config.adaptive;
157 self
158 }
159
160 pub fn build(self) -> anyhow::Result<AdaptiveHttpReaction> {
162 let config = HttpAdaptiveReactionConfig {
163 base_url: self.base_url,
164 token: self.token,
165 timeout_ms: self.timeout_ms,
166 routes: self.routes,
167 adaptive: self.adaptive,
168 };
169
170 Ok(AdaptiveHttpReaction::from_builder(
171 self.id,
172 self.queries,
173 config,
174 self.priority_queue_capacity,
175 self.auto_start,
176 ))
177 }
178}
179
180#[cfg(test)]
181mod tests {
182 use super::*;
183 use drasi_lib::Reaction;
184
185 #[test]
186 fn test_adaptive_http_builder_defaults() {
187 let reaction = HttpAdaptiveReactionBuilder::new("test-reaction")
188 .build()
189 .unwrap();
190 assert_eq!(reaction.id(), "test-reaction");
191 let props = reaction.properties();
192 assert_eq!(
193 props.get("baseUrl"),
194 Some(&serde_json::Value::String("http://localhost".to_string())) );
196 }
197
198 #[test]
199 fn test_adaptive_http_builder_custom_values() {
200 let reaction = AdaptiveHttpReaction::builder("test-reaction")
201 .with_base_url("http://api.example.com") .with_token("secret-token")
203 .with_timeout_ms(10000)
204 .with_queries(vec!["query1".to_string()])
205 .with_max_batch_size(500)
206 .build()
207 .unwrap();
208
209 assert_eq!(reaction.id(), "test-reaction");
210 assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
211 }
212
213 #[test]
214 fn test_adaptive_http_new_constructor() {
215 let config = HttpAdaptiveReactionConfig::default();
216
217 let reaction =
218 AdaptiveHttpReaction::new("test-reaction", vec!["query1".to_string()], config);
219
220 assert_eq!(reaction.id(), "test-reaction");
221 assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
222 }
223}
224
225#[cfg(feature = "dynamic-plugin")]
229drasi_plugin_sdk::export_plugin!(
230 plugin_id = "http-adaptive-reaction",
231 core_version = env!("CARGO_PKG_VERSION"),
232 lib_version = env!("CARGO_PKG_VERSION"),
233 plugin_version = env!("CARGO_PKG_VERSION"),
234 source_descriptors = [],
235 reaction_descriptors = [descriptor::HttpAdaptiveReactionDescriptor],
236 bootstrap_descriptors = [],
237);