Skip to main content

drasi_reaction_http_adaptive/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! HTTP Adaptive reaction plugin for Drasi
17//!
18//! This plugin implements HTTP Adaptive reactions for Drasi and provides extension traits
19//! for configuring HTTP Adaptive reactions in the Drasi plugin architecture.
20//!
21//! # Example
22//!
23//! ```rust,ignore
24//! use drasi_reaction_http_adaptive::AdaptiveHttpReaction;
25//!
26//! let reaction = AdaptiveHttpReaction::builder("my-adaptive-reaction")
27//!     .with_queries(vec!["query1".to_string()])
28//!     .with_base_url("http://api.example.com")
29//!     .with_token("secret-token")
30//!     .with_timeout_ms(10000)
31//!     .with_max_batch_size(500)
32//!     .build()?;
33//! ```
34
35mod adaptive_batcher;
36pub mod config;
37pub mod descriptor;
38pub mod http_adaptive;
39
40pub use config::HttpAdaptiveReactionConfig;
41pub use http_adaptive::AdaptiveHttpReaction;
42
43use drasi_lib::reactions::common::AdaptiveBatchConfig;
44use drasi_reaction_http::QueryConfig;
45use std::collections::HashMap;
46
47/// Builder for HTTP Adaptive reaction
48///
49/// Creates an AdaptiveHttpReaction instance with a fluent API.
50pub struct HttpAdaptiveReactionBuilder {
51    id: String,
52    queries: Vec<String>,
53    base_url: String,
54    token: Option<String>,
55    timeout_ms: u64,
56    routes: HashMap<String, QueryConfig>,
57    adaptive: AdaptiveBatchConfig,
58    priority_queue_capacity: Option<usize>,
59    auto_start: bool,
60}
61
62impl HttpAdaptiveReactionBuilder {
63    /// Create a new HTTP Adaptive reaction builder with the given ID
64    pub fn new(id: impl Into<String>) -> Self {
65        Self {
66            id: id.into(),
67            queries: Vec::new(),
68            base_url: "http://localhost".to_string(),
69            token: None,
70            timeout_ms: 5000,
71            routes: HashMap::new(),
72            adaptive: AdaptiveBatchConfig::default(),
73            priority_queue_capacity: None,
74            auto_start: true,
75        }
76    }
77
78    /// Set the query IDs to subscribe to
79    pub fn with_queries(mut self, queries: Vec<String>) -> Self {
80        self.queries = queries;
81        self
82    }
83
84    /// Add a query ID to subscribe to
85    pub fn with_query(mut self, query_id: impl Into<String>) -> Self {
86        self.queries.push(query_id.into());
87        self
88    }
89
90    /// Set the base URL for HTTP requests
91    pub fn with_base_url(mut self, base_url: impl Into<String>) -> Self {
92        self.base_url = base_url.into();
93        self
94    }
95
96    /// Set the authentication token
97    pub fn with_token(mut self, token: impl Into<String>) -> Self {
98        self.token = Some(token.into());
99        self
100    }
101
102    /// Set the request timeout in milliseconds
103    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
104        self.timeout_ms = timeout_ms;
105        self
106    }
107
108    /// Add a route configuration for a specific query
109    pub fn with_route(mut self, query_id: impl Into<String>, config: QueryConfig) -> Self {
110        self.routes.insert(query_id.into(), config);
111        self
112    }
113
114    /// Set the minimum batch size
115    pub fn with_min_batch_size(mut self, size: usize) -> Self {
116        self.adaptive.adaptive_min_batch_size = size;
117        self
118    }
119
120    /// Set the maximum batch size
121    pub fn with_max_batch_size(mut self, size: usize) -> Self {
122        self.adaptive.adaptive_max_batch_size = size;
123        self
124    }
125
126    /// Set the adaptive window size (in 100ms intervals)
127    pub fn with_window_size(mut self, size: usize) -> Self {
128        self.adaptive.adaptive_window_size = size;
129        self
130    }
131
132    /// Set the batch timeout in milliseconds
133    pub fn with_batch_timeout_ms(mut self, timeout_ms: u64) -> Self {
134        self.adaptive.adaptive_batch_timeout_ms = timeout_ms;
135        self
136    }
137
138    /// Set the priority queue capacity
139    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
140        self.priority_queue_capacity = Some(capacity);
141        self
142    }
143
144    /// Set whether the reaction should auto-start
145    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
146        self.auto_start = auto_start;
147        self
148    }
149
150    /// Set the full configuration at once
151    pub fn with_config(mut self, config: HttpAdaptiveReactionConfig) -> Self {
152        self.base_url = config.base_url;
153        self.token = config.token;
154        self.timeout_ms = config.timeout_ms;
155        self.routes = config.routes;
156        self.adaptive = config.adaptive;
157        self
158    }
159
160    /// Build the HTTP Adaptive reaction
161    pub fn build(self) -> anyhow::Result<AdaptiveHttpReaction> {
162        let config = HttpAdaptiveReactionConfig {
163            base_url: self.base_url,
164            token: self.token,
165            timeout_ms: self.timeout_ms,
166            routes: self.routes,
167            adaptive: self.adaptive,
168        };
169
170        Ok(AdaptiveHttpReaction::from_builder(
171            self.id,
172            self.queries,
173            config,
174            self.priority_queue_capacity,
175            self.auto_start,
176        ))
177    }
178}
179
180#[cfg(test)]
181mod tests {
182    use super::*;
183    use drasi_lib::Reaction;
184
185    #[test]
186    fn test_adaptive_http_builder_defaults() {
187        let reaction = HttpAdaptiveReactionBuilder::new("test-reaction")
188            .build()
189            .unwrap();
190        assert_eq!(reaction.id(), "test-reaction");
191        let props = reaction.properties();
192        assert_eq!(
193            props.get("baseUrl"),
194            Some(&serde_json::Value::String("http://localhost".to_string())) // DevSkim: ignore DS137138
195        );
196    }
197
198    #[test]
199    fn test_adaptive_http_builder_custom_values() {
200        let reaction = AdaptiveHttpReaction::builder("test-reaction")
201            .with_base_url("http://api.example.com") // DevSkim: ignore DS137138
202            .with_token("secret-token")
203            .with_timeout_ms(10000)
204            .with_queries(vec!["query1".to_string()])
205            .with_max_batch_size(500)
206            .build()
207            .unwrap();
208
209        assert_eq!(reaction.id(), "test-reaction");
210        assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
211    }
212
213    #[test]
214    fn test_adaptive_http_new_constructor() {
215        let config = HttpAdaptiveReactionConfig::default();
216
217        let reaction =
218            AdaptiveHttpReaction::new("test-reaction", vec!["query1".to_string()], config);
219
220        assert_eq!(reaction.id(), "test-reaction");
221        assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
222    }
223}
224
225/// Dynamic plugin entry point.
226///
227/// Dynamic plugin entry point.
228#[cfg(feature = "dynamic-plugin")]
229drasi_plugin_sdk::export_plugin!(
230    plugin_id = "http-adaptive-reaction",
231    core_version = env!("CARGO_PKG_VERSION"),
232    lib_version = env!("CARGO_PKG_VERSION"),
233    plugin_version = env!("CARGO_PKG_VERSION"),
234    source_descriptors = [],
235    reaction_descriptors = [descriptor::HttpAdaptiveReactionDescriptor],
236    bootstrap_descriptors = [],
237);