Skip to main content

drasi_lib/context/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Runtime context types for plugin service injection.
16//!
17//! This module provides `SourceRuntimeContext` and `ReactionRuntimeContext` structs
18//! that contain `Arc<T>` instances for drasi-lib provided services. These contexts
19//! are provided to plugins when they are added to DrasiLib via the `initialize()` method.
20//!
21//! # Overview
22//!
23//! Instead of multiple `inject_*` methods, plugins now receive all services through
24//! a single `initialize(context)` call. This provides:
25//!
26//! - **Single initialization point**: One call instead of multiple inject calls
27//! - **Guaranteed complete initialization**: Context is complete or doesn't exist
28//! - **Clearer API contract**: All available services visible at a glance
29//! - **Extensibility**: New services can be added without changing traits
30//!
31//! # Example - Source Plugin
32//!
33//! ```ignore
34//! use drasi_lib::context::SourceRuntimeContext;
35//!
36//! #[async_trait]
37//! impl Source for MySource {
38//!     async fn initialize(&self, context: SourceRuntimeContext) {
39//!         // Store context for later use
40//!         self.base.initialize(context).await;
41//!     }
42//!     // ...
43//! }
44//! ```
45//!
46//! # Example - Reaction Plugin
47//!
48//! ```ignore
49//! use drasi_lib::context::ReactionRuntimeContext;
50//!
51//! #[async_trait]
52//! impl Reaction for MyReaction {
53//!     async fn initialize(&self, context: ReactionRuntimeContext) {
54//!         // Store context for later use
55//!         self.base.initialize(context).await;
56//!     }
57//!     // ...
58//! }
59//! ```
60
61use std::sync::Arc;
62
63use crate::channels::ComponentEventSender;
64use crate::reactions::QueryProvider;
65use crate::state_store::StateStoreProvider;
66
67/// Context provided to Source plugins during initialization.
68///
69/// Contains `Arc<T>` instances for all drasi-lib provided services.
70/// DrasiLib constructs this context when a source is added via `add_source()`.
71///
72/// # Available Services
73///
74/// - `instance_id`: The DrasiLib instance ID (for log routing isolation)
75/// - `source_id`: The unique identifier for this source instance
76/// - `status_tx`: Channel for reporting component status/lifecycle events
77/// - `state_store`: Optional persistent state storage (if configured)
78///
79/// # Clone
80///
81/// This struct implements `Clone` and all fields use `Arc` internally,
82/// making cloning cheap (just reference count increments).
83#[derive(Clone)]
84pub struct SourceRuntimeContext {
85    /// DrasiLib instance ID (for log routing isolation)
86    pub instance_id: String,
87
88    /// Unique identifier for this source instance
89    pub source_id: String,
90
91    /// Channel for reporting component status/lifecycle events.
92    ///
93    /// Use this to send status updates (Starting, Running, Stopped, Error)
94    /// back to DrasiLib for monitoring and lifecycle management.
95    pub status_tx: ComponentEventSender,
96
97    /// Optional persistent state storage.
98    ///
99    /// This is `Some` if a state store provider was configured on DrasiLib,
100    /// otherwise `None`. Sources can use this to persist state across restarts.
101    pub state_store: Option<Arc<dyn StateStoreProvider>>,
102}
103
104impl SourceRuntimeContext {
105    /// Create a new source runtime context.
106    ///
107    /// This is typically called by `SourceManager` when adding a source to DrasiLib.
108    /// Plugin developers do not need to call this directly.
109    ///
110    /// # Arguments
111    ///
112    /// * `instance_id` - The DrasiLib instance ID
113    /// * `source_id` - The unique identifier for this source
114    /// * `status_tx` - Channel for reporting component status/lifecycle events
115    /// * `state_store` - Optional persistent state storage
116    pub fn new(
117        instance_id: impl Into<String>,
118        source_id: impl Into<String>,
119        status_tx: ComponentEventSender,
120        state_store: Option<Arc<dyn StateStoreProvider>>,
121    ) -> Self {
122        Self {
123            instance_id: instance_id.into(),
124            source_id: source_id.into(),
125            status_tx,
126            state_store,
127        }
128    }
129
130    /// Get the DrasiLib instance ID.
131    pub fn instance_id(&self) -> &str {
132        &self.instance_id
133    }
134
135    /// Get the source's unique identifier.
136    pub fn source_id(&self) -> &str {
137        &self.source_id
138    }
139
140    /// Get a reference to the status channel.
141    ///
142    /// Use this to send component status updates (Starting, Running, Stopped, Error)
143    /// back to DrasiLib.
144    pub fn status_tx(&self) -> &ComponentEventSender {
145        &self.status_tx
146    }
147
148    /// Get a reference to the state store if configured.
149    ///
150    /// Returns `Some(&Arc<dyn StateStoreProvider>)` if a state store was configured,
151    /// otherwise `None`.
152    pub fn state_store(&self) -> Option<&Arc<dyn StateStoreProvider>> {
153        self.state_store.as_ref()
154    }
155}
156
157impl std::fmt::Debug for SourceRuntimeContext {
158    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
159        f.debug_struct("SourceRuntimeContext")
160            .field("instance_id", &self.instance_id)
161            .field("source_id", &self.source_id)
162            .field("status_tx", &"<ComponentEventSender>")
163            .field(
164                "state_store",
165                &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
166            )
167            .finish()
168    }
169}
170
171/// Context provided to Reaction plugins during initialization.
172///
173/// Contains `Arc<T>` instances for all drasi-lib provided services.
174/// DrasiLib constructs this context when a reaction is added via `add_reaction()`.
175///
176/// # Available Services
177///
178/// - `instance_id`: The DrasiLib instance ID (for log routing isolation)
179/// - `reaction_id`: The unique identifier for this reaction instance
180/// - `status_tx`: Channel for reporting component status/lifecycle events
181/// - `state_store`: Optional persistent state storage (if configured)
182/// - `query_provider`: Access to query instances for subscription
183///
184/// # Clone
185///
186/// This struct implements `Clone` and all fields use `Arc` internally,
187/// making cloning cheap (just reference count increments).
188#[derive(Clone)]
189pub struct ReactionRuntimeContext {
190    /// DrasiLib instance ID (for log routing isolation)
191    pub instance_id: String,
192
193    /// Unique identifier for this reaction instance
194    pub reaction_id: String,
195
196    /// Channel for reporting component status/lifecycle events.
197    ///
198    /// Use this to send status updates (Starting, Running, Stopped, Error)
199    /// back to DrasiLib for monitoring and lifecycle management.
200    pub status_tx: ComponentEventSender,
201
202    /// Optional persistent state storage.
203    ///
204    /// This is `Some` if a state store provider was configured on DrasiLib,
205    /// otherwise `None`. Reactions can use this to persist state across restarts.
206    pub state_store: Option<Arc<dyn StateStoreProvider>>,
207
208    /// Access to query instances for subscription.
209    ///
210    /// Reactions use this to get query instances and subscribe to their results.
211    /// This is always available (not optional) since reactions require queries.
212    pub query_provider: Arc<dyn QueryProvider>,
213}
214
215impl ReactionRuntimeContext {
216    /// Create a new reaction runtime context.
217    ///
218    /// This is typically called by `ReactionManager` when adding a reaction to DrasiLib.
219    /// Plugin developers do not need to call this directly.
220    ///
221    /// # Arguments
222    ///
223    /// * `instance_id` - The DrasiLib instance ID
224    /// * `reaction_id` - The unique identifier for this reaction
225    /// * `status_tx` - Channel for reporting component status/lifecycle events
226    /// * `state_store` - Optional persistent state storage
227    /// * `query_provider` - Access to query instances for subscription
228    pub fn new(
229        instance_id: impl Into<String>,
230        reaction_id: impl Into<String>,
231        status_tx: ComponentEventSender,
232        state_store: Option<Arc<dyn StateStoreProvider>>,
233        query_provider: Arc<dyn QueryProvider>,
234    ) -> Self {
235        Self {
236            instance_id: instance_id.into(),
237            reaction_id: reaction_id.into(),
238            status_tx,
239            state_store,
240            query_provider,
241        }
242    }
243
244    /// Get the DrasiLib instance ID.
245    pub fn instance_id(&self) -> &str {
246        &self.instance_id
247    }
248
249    /// Get the reaction's unique identifier.
250    pub fn reaction_id(&self) -> &str {
251        &self.reaction_id
252    }
253
254    /// Get a reference to the status channel.
255    ///
256    /// Use this to send component status updates (Starting, Running, Stopped, Error)
257    /// back to DrasiLib.
258    pub fn status_tx(&self) -> &ComponentEventSender {
259        &self.status_tx
260    }
261
262    /// Get a reference to the state store if configured.
263    ///
264    /// Returns `Some(&Arc<dyn StateStoreProvider>)` if a state store was configured,
265    /// otherwise `None`.
266    pub fn state_store(&self) -> Option<&Arc<dyn StateStoreProvider>> {
267        self.state_store.as_ref()
268    }
269
270    /// Get a reference to the query provider.
271    ///
272    /// Use this to get query instances and subscribe to their results.
273    pub fn query_provider(&self) -> &Arc<dyn QueryProvider> {
274        &self.query_provider
275    }
276}
277
278impl std::fmt::Debug for ReactionRuntimeContext {
279    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
280        f.debug_struct("ReactionRuntimeContext")
281            .field("instance_id", &self.instance_id)
282            .field("reaction_id", &self.reaction_id)
283            .field("status_tx", &"<ComponentEventSender>")
284            .field(
285                "state_store",
286                &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
287            )
288            .field("query_provider", &"<QueryProvider>")
289            .finish()
290    }
291}
292
293#[cfg(test)]
294mod tests {
295    use super::*;
296    use crate::queries::Query;
297    use crate::state_store::MemoryStateStoreProvider;
298    use anyhow::Result;
299    use async_trait::async_trait;
300    use std::sync::Arc;
301    use tokio::sync::mpsc;
302
303    // Mock QueryProvider for testing
304    struct MockQueryProvider;
305
306    #[async_trait]
307    impl QueryProvider for MockQueryProvider {
308        async fn get_query_instance(&self, _id: &str) -> Result<Arc<dyn Query>> {
309            Err(anyhow::anyhow!("MockQueryProvider: query not found"))
310        }
311    }
312
313    #[tokio::test]
314    async fn test_source_runtime_context_creation() {
315        let (status_tx, _rx) = mpsc::channel(100);
316        let state_store = Arc::new(MemoryStateStoreProvider::new());
317
318        let context =
319            SourceRuntimeContext::new("test-instance", "test-source", status_tx, Some(state_store));
320
321        assert_eq!(context.instance_id(), "test-instance");
322        assert_eq!(context.source_id(), "test-source");
323        assert!(context.state_store().is_some());
324    }
325
326    #[tokio::test]
327    async fn test_source_runtime_context_without_state_store() {
328        let (status_tx, _rx) = mpsc::channel(100);
329
330        let context = SourceRuntimeContext::new("test-instance", "test-source", status_tx, None);
331
332        assert_eq!(context.source_id(), "test-source");
333        assert!(context.state_store().is_none());
334    }
335
336    #[tokio::test]
337    async fn test_source_runtime_context_clone() {
338        let (status_tx, _rx) = mpsc::channel(100);
339        let state_store = Arc::new(MemoryStateStoreProvider::new());
340
341        let context =
342            SourceRuntimeContext::new("test-instance", "test-source", status_tx, Some(state_store));
343
344        let cloned = context.clone();
345        assert_eq!(cloned.source_id(), context.source_id());
346    }
347
348    #[tokio::test]
349    async fn test_reaction_runtime_context_creation() {
350        let (status_tx, _rx) = mpsc::channel(100);
351        let state_store = Arc::new(MemoryStateStoreProvider::new());
352        let query_provider = Arc::new(MockQueryProvider);
353
354        let context = ReactionRuntimeContext::new(
355            "test-instance",
356            "test-reaction",
357            status_tx,
358            Some(state_store),
359            query_provider,
360        );
361
362        assert_eq!(context.instance_id(), "test-instance");
363        assert_eq!(context.reaction_id(), "test-reaction");
364        assert!(context.state_store().is_some());
365    }
366
367    #[tokio::test]
368    async fn test_reaction_runtime_context_without_state_store() {
369        let (status_tx, _rx) = mpsc::channel(100);
370        let query_provider = Arc::new(MockQueryProvider);
371
372        let context = ReactionRuntimeContext::new(
373            "test-instance",
374            "test-reaction",
375            status_tx,
376            None,
377            query_provider,
378        );
379
380        assert_eq!(context.reaction_id(), "test-reaction");
381        assert!(context.state_store().is_none());
382    }
383
384    #[tokio::test]
385    async fn test_reaction_runtime_context_clone() {
386        let (status_tx, _rx) = mpsc::channel(100);
387        let state_store = Arc::new(MemoryStateStoreProvider::new());
388        let query_provider = Arc::new(MockQueryProvider);
389
390        let context = ReactionRuntimeContext::new(
391            "test-instance",
392            "test-reaction",
393            status_tx,
394            Some(state_store),
395            query_provider,
396        );
397
398        let cloned = context.clone();
399        assert_eq!(cloned.reaction_id(), context.reaction_id());
400    }
401
402    #[test]
403    fn test_source_runtime_context_debug() {
404        let (status_tx, _rx) = mpsc::channel::<crate::channels::ComponentEvent>(100);
405        let context = SourceRuntimeContext::new("test-instance", "test", status_tx, None);
406        let debug_str = format!("{context:?}");
407        assert!(debug_str.contains("SourceRuntimeContext"));
408        assert!(debug_str.contains("test"));
409    }
410
411    #[test]
412    fn test_reaction_runtime_context_debug() {
413        let (status_tx, _rx) = mpsc::channel::<crate::channels::ComponentEvent>(100);
414        let query_provider = Arc::new(MockQueryProvider);
415        let context =
416            ReactionRuntimeContext::new("test-instance", "test", status_tx, None, query_provider);
417        let debug_str = format!("{context:?}");
418        assert!(debug_str.contains("ReactionRuntimeContext"));
419        assert!(debug_str.contains("test"));
420    }
421}