Skip to main content

drasi_reaction_grpc/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2025 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! gRPC reaction plugin for Drasi
17//!
18//! This plugin implements gRPC reactions for Drasi.
19//!
20//! # Example
21//!
22//! ```rust,ignore
23//! use drasi_reaction_grpc::GrpcReaction;
24//!
25//! let reaction = GrpcReaction::builder("my-grpc-reaction")
26//!     .with_queries(vec!["query1".to_string()])
27//!     .with_endpoint("grpc://localhost:50052")
28//!     .with_batch_size(200)
29//!     .with_timeout_ms(10000)
30//!     .build()?;
31//! ```
32
33pub mod config;
34pub mod connection;
35pub mod descriptor;
36pub mod grpc;
37pub mod helpers;
38pub mod proto;
39
40pub use config::GrpcReactionConfig;
41pub use grpc::GrpcReaction;
42
43// Re-export types for plugin-grpc-adaptive
44pub use helpers::convert_json_to_proto_struct;
45pub use proto::{
46    ProcessResultsRequest, ProtoQueryResult, ProtoQueryResultItem, ReactionServiceClient,
47};
48
49use std::collections::HashMap;
50
51/// Builder for gRPC reaction
52///
53/// Creates a GrpcReaction instance with a fluent API.
54pub struct GrpcReactionBuilder {
55    id: String,
56    queries: Vec<String>,
57    endpoint: String,
58    timeout_ms: u64,
59    batch_size: usize,
60    batch_flush_timeout_ms: u64,
61    max_retries: u32,
62    connection_retry_attempts: u32,
63    initial_connection_timeout_ms: u64,
64    metadata: HashMap<String, String>,
65    priority_queue_capacity: Option<usize>,
66    auto_start: bool,
67}
68
69impl GrpcReactionBuilder {
70    /// Create a new gRPC reaction builder with the given ID
71    pub fn new(id: impl Into<String>) -> Self {
72        Self {
73            id: id.into(),
74            queries: Vec::new(),
75            endpoint: "grpc://localhost:50052".to_string(),
76            timeout_ms: 5000,
77            batch_size: 100,
78            batch_flush_timeout_ms: 1000,
79            max_retries: 3,
80            connection_retry_attempts: 5,
81            initial_connection_timeout_ms: 10000,
82            metadata: HashMap::new(),
83            priority_queue_capacity: None,
84            auto_start: true,
85        }
86    }
87
88    /// Set the query IDs to subscribe to
89    pub fn with_queries(mut self, queries: Vec<String>) -> Self {
90        self.queries = queries;
91        self
92    }
93
94    /// Add a query ID to subscribe to
95    pub fn with_query(mut self, query_id: impl Into<String>) -> Self {
96        self.queries.push(query_id.into());
97        self
98    }
99
100    /// Set the gRPC endpoint
101    pub fn with_endpoint(mut self, endpoint: impl Into<String>) -> Self {
102        self.endpoint = endpoint.into();
103        self
104    }
105
106    /// Set the request timeout in milliseconds
107    pub fn with_timeout_ms(mut self, timeout_ms: u64) -> Self {
108        self.timeout_ms = timeout_ms;
109        self
110    }
111
112    /// Set the batch size
113    pub fn with_batch_size(mut self, batch_size: usize) -> Self {
114        self.batch_size = batch_size;
115        self
116    }
117
118    /// Set the batch flush timeout in milliseconds
119    pub fn with_batch_flush_timeout_ms(mut self, timeout_ms: u64) -> Self {
120        self.batch_flush_timeout_ms = timeout_ms;
121        self
122    }
123
124    /// Set the maximum number of retries for failed requests
125    pub fn with_max_retries(mut self, retries: u32) -> Self {
126        self.max_retries = retries;
127        self
128    }
129
130    /// Set the number of connection retry attempts
131    pub fn with_connection_retry_attempts(mut self, attempts: u32) -> Self {
132        self.connection_retry_attempts = attempts;
133        self
134    }
135
136    /// Set the initial connection timeout in milliseconds
137    pub fn with_initial_connection_timeout_ms(mut self, timeout_ms: u64) -> Self {
138        self.initial_connection_timeout_ms = timeout_ms;
139        self
140    }
141
142    /// Add metadata header
143    pub fn with_metadata(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
144        self.metadata.insert(key.into(), value.into());
145        self
146    }
147
148    /// Set the priority queue capacity
149    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
150        self.priority_queue_capacity = Some(capacity);
151        self
152    }
153
154    /// Set whether the reaction should auto-start
155    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
156        self.auto_start = auto_start;
157        self
158    }
159
160    /// Set the full configuration at once
161    pub fn with_config(mut self, config: GrpcReactionConfig) -> Self {
162        self.endpoint = config.endpoint;
163        self.timeout_ms = config.timeout_ms;
164        self.batch_size = config.batch_size;
165        self.batch_flush_timeout_ms = config.batch_flush_timeout_ms;
166        self.max_retries = config.max_retries;
167        self.connection_retry_attempts = config.connection_retry_attempts;
168        self.initial_connection_timeout_ms = config.initial_connection_timeout_ms;
169        self.metadata = config.metadata;
170        self
171    }
172
173    /// Build the gRPC reaction
174    pub fn build(self) -> anyhow::Result<GrpcReaction> {
175        let config = GrpcReactionConfig {
176            endpoint: self.endpoint,
177            timeout_ms: self.timeout_ms,
178            batch_size: self.batch_size,
179            batch_flush_timeout_ms: self.batch_flush_timeout_ms,
180            max_retries: self.max_retries,
181            connection_retry_attempts: self.connection_retry_attempts,
182            initial_connection_timeout_ms: self.initial_connection_timeout_ms,
183            metadata: self.metadata,
184        };
185
186        Ok(GrpcReaction::from_builder(
187            self.id,
188            self.queries,
189            config,
190            self.priority_queue_capacity,
191            self.auto_start,
192        ))
193    }
194}
195
196#[cfg(test)]
197mod tests {
198    use super::*;
199    use drasi_lib::Reaction;
200
201    #[test]
202    fn test_grpc_builder_defaults() {
203        let reaction = GrpcReactionBuilder::new("test-reaction").build().unwrap();
204        assert_eq!(reaction.id(), "test-reaction");
205        let props = reaction.properties();
206        assert_eq!(
207            props.get("endpoint"),
208            Some(&serde_json::Value::String(
209                "grpc://localhost:50052".to_string()
210            ))
211        );
212        assert_eq!(
213            props.get("batchSize"),
214            Some(&serde_json::Value::Number(100.into()))
215        );
216    }
217
218    #[test]
219    fn test_grpc_builder_custom_values() {
220        let reaction = GrpcReaction::builder("test-reaction")
221            .with_endpoint("grpc://api.example.com:50052")
222            .with_timeout_ms(10000)
223            .with_batch_size(200)
224            .with_queries(vec!["query1".to_string()])
225            .build()
226            .unwrap();
227
228        assert_eq!(reaction.id(), "test-reaction");
229        assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
230    }
231
232    #[test]
233    fn test_grpc_new_constructor() {
234        let config = GrpcReactionConfig::default();
235
236        let reaction = GrpcReaction::new("test-reaction", vec!["query1".to_string()], config);
237
238        assert_eq!(reaction.id(), "test-reaction");
239        assert_eq!(reaction.query_ids(), vec!["query1".to_string()]);
240    }
241}
242
243/// Dynamic plugin entry point.
244///
245/// Dynamic plugin entry point.
246#[cfg(feature = "dynamic-plugin")]
247drasi_plugin_sdk::export_plugin!(
248    plugin_id = "grpc-reaction",
249    core_version = env!("CARGO_PKG_VERSION"),
250    lib_version = env!("CARGO_PKG_VERSION"),
251    plugin_version = env!("CARGO_PKG_VERSION"),
252    source_descriptors = [],
253    reaction_descriptors = [descriptor::GrpcReactionDescriptor],
254    bootstrap_descriptors = [],
255);