drasi_reaction_grpc/
lib.rs1#![allow(unexpected_cfgs)]
2pub mod config;
34pub mod connection;
35pub mod descriptor;
36pub mod grpc;
37pub mod helpers;
38pub mod proto;
39
40pub use config::GrpcReactionConfig;
41pub use grpc::GrpcReaction;
42
43pub use helpers::convert_json_to_proto_struct;
45pub use proto::{
46 ProcessResultsRequest, ProtoQueryResult, ProtoQueryResultItem, ReactionServiceClient,
47};
48
49use std::collections::HashMap;
50
51pub struct GrpcReactionBuilder {
55 id: String,
56 queries: Vec<String>,
57 endpoint: String,
58 timeout_ms: u64,
59 batch_size: usize,
60 batch_flush_timeout_ms: u64,
61 max_retries: u32,
62 connection_retry_attempts: u32,
63 initial_connection_timeout_ms: u64,
64 metadata: HashMap<String, String>,
65 priority_queue_capacity: Option<usize>,
66 auto_start: bool,
67}
68
69impl GrpcReactionBuilder {
70 pub fn new(id: impl Into<String>) -> Self {
72 Self {
73 id: id.into(),
74 queries: Vec::new(),
75 endpoint: "grpc://localhost:50052".to_string(),
76 timeout_ms: 5000,
77 batch_size: 100,
78 batch_flush_timeout_ms: 1000,
79 max_retries: 3,
80 connection_retry_attempts: 5,
81 initial_connection_timeout_ms: 10000,
82 metadata: HashMap::new(),
83 priority_queue_capacity: None,
84 auto_start: true,
85 }
86 }
87
88 pub fn with_queries(mut self, queries: Vec<String>) -> Self {
90 self.queries = queries;
91 self
92 }
93
94 pub fn with_query(mut self, query_id: impl Into<String>) -> Self {
96 self.queries.push(query_id.into());
97 self
98 }
99
100 pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
102 self.endpoint = endpoint.into();
103 self
104 }
105
106 pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
108 self.timeout_ms = timeout_ms;
109 self
110 }
111
112 pub fn with_batch_size(mut self, batch_size: usize) -> Self {
114 self.batch_size = batch_size;
115 self
116 }
117
118 pub fn with_batch_flush_timeout_ms(mut self, timeout_ms: u64) -> Self {
120 self.batch_flush_timeout_ms = timeout_ms;
121 self
122 }
123
124 pub fn with_max_retries(mut self, retries: u32) -> Self {
126 self.max_retries = retries;
127 self
128 }
129
130 pub fn with_connection_retry_attempts(mut self, attempts: u32) -> Self {
132 self.connection_retry_attempts = attempts;
133 self
134 }
135
136 pub fn with_initial_connection_timeout_ms(mut self, timeout_ms: u64) -> Self {
138 self.initial_connection_timeout_ms = timeout_ms;
139 self
140 }
141
142 pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
144 self.metadata.insert(key.into(), value.into());
145 self
146 }
147
148 pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
150 self.priority_queue_capacity = Some(capacity);
151 self
152 }
153
154 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
156 self.auto_start = auto_start;
157 self
158 }
159
160 pub fn with_config(mut self, config: GrpcReactionConfig) -> Self {
162 self.endpoint = config.endpoint;
163 self.timeout_ms = config.timeout_ms;
164 self.batch_size = config.batch_size;
165 self.batch_flush_timeout_ms = config.batch_flush_timeout_ms;
166 self.max_retries = config.max_retries;
167 self.connection_retry_attempts = config.connection_retry_attempts;
168 self.initial_connection_timeout_ms = config.initial_connection_timeout_ms;
169 self.metadata = config.metadata;
170 self
171 }
172
173 pub fn build(self) -> anyhow::Result<GrpcReaction> {
175 let config = GrpcReactionConfig {
176 endpoint: self.endpoint,
177 timeout_ms: self.timeout_ms,
178 batch_size: self.batch_size,
179 batch_flush_timeout_ms: self.batch_flush_timeout_ms,
180 max_retries: self.max_retries,
181 connection_retry_attempts: self.connection_retry_attempts,
182 initial_connection_timeout_ms: self.initial_connection_timeout_ms,
183 metadata: self.metadata,
184 };
185
186 Ok(GrpcReaction::from_builder(
187 self.id,
188 self.queries,
189 config,
190 self.priority_queue_capacity,
191 self.auto_start,
192 ))
193 }
194}
195
196#[cfg(test)]
197mod tests {
198 use super::*;
199 use drasi_lib::Reaction;
200
201 #[test]
202 fn test_grpc_builder_defaults() {
203 let reaction = GrpcReactionBuilder::new("test-reaction").build().unwrap();
204 assert_eq!(reaction.id(), "test-reaction");
205 let props = reaction.properties();
206 assert_eq!(
207 props.get("endpoint"),
208 Some(&serde_json::Value::String(
209 "grpc://localhost:50052".to_string()
210 ))
211 );
212 assert_eq!(
213 props.get("batchSize"),
214 Some(&serde_json::Value::Number(100.into()))
215 );
216 }
217
218 #[test]
219 fn test_grpc_builder_custom_values() {
220 let reaction = GrpcReaction::builder("test-reaction")
221 .with_endpoint("grpc://api.example.com:50052")
222 .with_timeout_ms(10000)
223 .with_batch_size(200)
224 .with_queries(vec!["query1".to_string()])
225 .build()
226 .unwrap();
227
228 assert_eq!(reaction.id(), "test-reaction");
229 assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
230 }
231
232 #[test]
233 fn test_grpc_new_constructor() {
234 let config = GrpcReactionConfig::default();
235
236 let reaction = GrpcReaction::new("test-reaction", vec!["query1".to_string()], config);
237
238 assert_eq!(reaction.id(), "test-reaction");
239 assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
240 }
241}
242
243#[cfg(feature = "dynamic-plugin")]
247drasi_plugin_sdk::export_plugin!(
248 plugin_id = "grpc-reaction",
249 core_version = env!("CARGO_PKG_VERSION"),
250 lib_version = env!("CARGO_PKG_VERSION"),
251 plugin_version = env!("CARGO_PKG_VERSION"),
252 source_descriptors = [],
253 reaction_descriptors = [descriptor::GrpcReactionDescriptor],
254 bootstrap_descriptors = [],
255);