drasi_lib/reactions/traits.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Reaction trait module
16//!
17//! This module provides the core traits that all reaction plugins must implement.
18//! It separates the plugin contract from the reaction manager and implementation details.
19//!
20//! # Plugin Architecture
21//!
22//! Each reaction plugin:
23//! 1. Defines its own typed configuration struct with builder pattern
24//! 2. Creates a `ReactionBase` instance using `ReactionBaseParams`
25//! 3. Implements the `Reaction` trait
26//! 4. Is passed to `DrasiLib` via `add_reaction()` which takes ownership
27//!
28//! drasi-lib has no knowledge of which plugins exist - it only knows about this trait.
29//!
30//! # Runtime Context Initialization
31//!
32//! Reactions receive all drasi-lib services through a single `initialize()` call
33//! when added to DrasiLib. The `ReactionRuntimeContext` provides:
34//! - `event_tx`: Channel for component lifecycle events
35//! - `state_store`: Optional persistent state storage
36//! - `query_provider`: Access to query instances for subscription
37//!
38//! This replaces the previous `inject_*` methods with a cleaner single-call pattern.
39
40use anyhow::Result;
41use async_trait::async_trait;
42use std::sync::Arc;
43
44use crate::channels::ComponentStatus;
45use crate::context::ReactionRuntimeContext;
46use crate::queries::Query;
47
48/// Trait for providing access to queries without requiring full DrasiLib dependency.
49///
50/// This trait provides a way for reactions to access query instances for subscription
51/// without needing a direct dependency on the server core.
52#[async_trait]
53pub trait QueryProvider: Send + Sync {
54 /// Get a query instance by ID
55 async fn get_query_instance(&self, id: &str) -> Result<Arc<dyn Query>>;
56}
57
58/// Trait defining the interface for all reaction implementations.
59///
60/// This is the core abstraction that all reaction plugins must implement.
61/// drasi-lib only interacts with reactions through this trait - it has no
62/// knowledge of specific plugin types or their configurations.
63///
64/// # Lifecycle
65///
66/// Reactions follow this lifecycle:
67/// 1. Created by plugin code with configuration
68/// 2. Added to DrasiLib via `add_reaction()` - dependencies injected automatically
69/// 3. Started via `start()` (auto-start or manual based on `auto_start()`)
70/// 4. Stopped via `stop()` when no longer needed
71///
72/// # Subscription Model
73///
74/// Reactions manage their own subscriptions to queries using the broadcast channel pattern:
75/// - QueryProvider is injected via `inject_query_provider()` at add time
76/// - Reactions access queries via `query_provider.get_query_instance()`
77/// - For each query, reactions call `query.subscribe(reaction_id)`
78/// - Each subscription provides a broadcast receiver for that query's results
79/// - Reactions use a priority queue to process results from multiple queries in timestamp order
80///
81/// # Example Implementation
82///
83/// ```ignore
84/// use drasi_lib::{Reaction, QueryProvider};
85/// use drasi_lib::reactions::{ReactionBase, ReactionBaseParams};
86/// use drasi_lib::context::ReactionRuntimeContext;
87///
88/// pub struct MyReaction {
89/// base: ReactionBase,
90/// // Plugin-specific fields
91/// }
92///
93/// impl MyReaction {
94/// pub fn new(config: MyReactionConfig) -> Self {
95/// let params = ReactionBaseParams::new(&config.id, config.queries.clone())
96/// .with_priority_queue_capacity(config.queue_capacity);
97///
98/// Self {
99/// base: ReactionBase::new(params),
100/// }
101/// }
102/// }
103///
104/// #[async_trait]
105/// impl Reaction for MyReaction {
106/// fn id(&self) -> &str {
107/// &self.base.id
108/// }
109///
110/// fn type_name(&self) -> &str {
111/// "my-reaction"
112/// }
113///
114/// fn query_ids(&self) -> Vec<String> {
115/// self.base.queries.clone()
116/// }
117///
118/// async fn initialize(&self, context: ReactionRuntimeContext) {
119/// self.base.initialize(context).await;
120/// }
121///
122/// async fn start(&self) -> Result<()> {
123/// self.base.subscribe_to_queries().await?;
124/// // ... start processing
125/// Ok(())
126/// }
127///
128/// // ... implement other methods
129/// }
130/// ```
131#[async_trait]
132pub trait Reaction: Send + Sync {
133 /// Get the reaction's unique identifier
134 fn id(&self) -> &str;
135
136 /// Get the reaction type name (e.g., "http", "log", "sse")
137 fn type_name(&self) -> &str;
138
139 /// Get the reaction's configuration properties for inspection
140 ///
141 /// This returns a HashMap representation of the reaction's configuration
142 /// for use in APIs and inspection. The actual typed configuration is
143 /// owned by the plugin - this is just for external visibility.
144 fn properties(&self) -> std::collections::HashMap<String, serde_json::Value>;
145
146 /// Get the list of query IDs this reaction subscribes to
147 fn query_ids(&self) -> Vec<String>;
148
149 /// Whether this reaction should auto-start when DrasiLib starts
150 ///
151 /// Default is `true` to match query behavior. Override to return `false`
152 /// if this reaction should only be started manually via `start_reaction()`.
153 fn auto_start(&self) -> bool {
154 true
155 }
156
157 /// Initialize the reaction with runtime context.
158 ///
159 /// This method is called automatically by DrasiLib when the reaction is added
160 /// via `add_reaction()`. Plugin developers do not need to call this directly.
161 ///
162 /// The context provides access to:
163 /// - `reaction_id`: The reaction's unique identifier
164 /// - `event_tx`: Channel for reporting component lifecycle events
165 /// - `state_store`: Optional persistent state storage
166 /// - `query_provider`: Access to query instances for subscription
167 ///
168 /// Implementation should delegate to `self.base.initialize(context).await`.
169 async fn initialize(&self, context: ReactionRuntimeContext);
170
171 /// Start the reaction
172 ///
173 /// The reaction should:
174 /// 1. Subscribe to all configured queries (using injected QueryProvider)
175 /// 2. Start its processing loop
176 /// 3. Update its status to Running
177 ///
178 /// Note: QueryProvider is already available via `inject_query_provider()` which
179 /// is called when the reaction is added to DrasiLib.
180 async fn start(&self) -> Result<()>;
181
182 /// Stop the reaction, cleaning up all subscriptions and tasks
183 async fn stop(&self) -> Result<()>;
184
185 /// Get the current status of the reaction
186 async fn status(&self) -> ComponentStatus;
187
188 /// Permanently clean up internal state when the reaction is being removed.
189 ///
190 /// This is called when `remove_reaction(id, cleanup: true)` is used.
191 /// Use this to release external resources that should not persist after
192 /// the reaction is deleted (e.g., delete output topics, remove external subscriptions).
193 ///
194 /// The default implementation is a no-op. Override only if your reaction
195 /// manages external state that needs explicit teardown.
196 ///
197 /// Errors are logged but do not prevent the reaction from being removed.
198 async fn deprovision(&self) -> Result<()> {
199 Ok(())
200 }
201}
202
203/// Blanket implementation of Reaction for `Box<dyn Reaction>`
204///
205/// This allows boxed trait objects to be used with methods expecting `impl Reaction`.
206#[async_trait]
207impl Reaction for Box<dyn Reaction + 'static> {
208 fn id(&self) -> &str {
209 (**self).id()
210 }
211
212 fn type_name(&self) -> &str {
213 (**self).type_name()
214 }
215
216 fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
217 (**self).properties()
218 }
219
220 fn query_ids(&self) -> Vec<String> {
221 (**self).query_ids()
222 }
223
224 fn auto_start(&self) -> bool {
225 (**self).auto_start()
226 }
227
228 async fn initialize(&self, context: ReactionRuntimeContext) {
229 (**self).initialize(context).await
230 }
231
232 async fn start(&self) -> Result<()> {
233 (**self).start().await
234 }
235
236 async fn stop(&self) -> Result<()> {
237 (**self).stop().await
238 }
239
240 async fn status(&self) -> ComponentStatus {
241 (**self).status().await
242 }
243
244 async fn deprovision(&self) -> Result<()> {
245 (**self).deprovision().await
246 }
247}