Skip to main content

drasi_lib/context/
mod.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Runtime context types for plugin service injection.
16//!
17//! This module provides `SourceRuntimeContext` and `ReactionRuntimeContext` structs
18//! that contain `Arc<T>` instances for drasi-lib provided services. These contexts
19//! are provided to plugins when they are added to DrasiLib via the `initialize()` method.
20//!
21//! # Overview
22//!
23//! Instead of multiple `inject_*` methods, plugins now receive all services through
24//! a single `initialize(context)` call. This provides:
25//!
26//! - **Single initialization point**: One call instead of multiple inject calls
27//! - **Guaranteed complete initialization**: Context is complete or doesn't exist
28//! - **Clearer API contract**: All available services visible at a glance
29//! - **Extensibility**: New services can be added without changing traits
30//!
31//! # Example - Source Plugin
32//!
33//! ```ignore
34//! use drasi_lib::context::SourceRuntimeContext;
35//!
36//! #[async_trait]
37//! impl Source for MySource {
38//!     async fn initialize(&self, context: SourceRuntimeContext) {
39//!         // Store context for later use
40//!         self.base.initialize(context).await;
41//!     }
42//!     // ...
43//! }
44//! ```
45//!
46//! # Example - Reaction Plugin
47//!
48//! ```ignore
49//! use drasi_lib::context::ReactionRuntimeContext;
50//!
51//! #[async_trait]
52//! impl Reaction for MyReaction {
53//!     async fn initialize(&self, context: ReactionRuntimeContext) {
54//!         // Store context for later use
55//!         self.base.initialize(context).await;
56//!     }
57//!     // ...
58//! }
59//! ```
60
61use std::sync::Arc;
62
63use crate::component_graph::ComponentUpdateSender;
64use crate::identity::IdentityProvider;
65use crate::reactions::SnapshotFetcher;
66use crate::state_store::StateStoreProvider;
67
68/// Context provided to Source plugins during initialization.
69///
70/// Contains `Arc<T>` instances for all drasi-lib provided services.
71/// DrasiLib constructs this context when a source is added via `add_source()`.
72///
73/// # Available Services
74///
75/// - `instance_id`: The DrasiLib instance ID (for log routing isolation)
76/// - `source_id`: The unique identifier for this source instance
77/// - `state_store`: Optional persistent state storage (if configured)
78/// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
79///
80/// # Clone
81///
82/// This struct implements `Clone` and all fields use `Arc` internally,
83/// making cloning cheap (just reference count increments).
84#[derive(Clone)]
85pub struct SourceRuntimeContext {
86    /// DrasiLib instance ID (for log routing isolation)
87    pub instance_id: String,
88
89    /// Unique identifier for this source instance
90    pub source_id: String,
91
92    /// Optional persistent state storage.
93    ///
94    /// This is `Some` if a state store provider was configured on DrasiLib,
95    /// otherwise `None`. Sources can use this to persist state across restarts.
96    pub state_store: Option<Arc<dyn StateStoreProvider>>,
97
98    /// mpsc sender for fire-and-forget component status updates.
99    ///
100    /// Status changes sent here are applied to the component graph by the
101    /// graph update loop, which emits broadcast events to all subscribers.
102    pub update_tx: ComponentUpdateSender,
103
104    /// Optional identity provider for credential injection.
105    ///
106    /// This is `Some` if the host has configured an identity provider for this component.
107    /// Sources can use this to obtain authentication credentials (passwords, tokens,
108    /// certificates) for connecting to external systems.
109    pub identity_provider: Option<Arc<dyn IdentityProvider>>,
110}
111
112impl SourceRuntimeContext {
113    /// Create a new source runtime context.
114    ///
115    /// This is typically called by `SourceManager` when adding a source to DrasiLib.
116    /// Plugin developers do not need to call this directly.
117    ///
118    /// # Arguments
119    ///
120    /// * `instance_id` - The DrasiLib instance ID
121    /// * `source_id` - The unique identifier for this source
122    /// * `state_store` - Optional persistent state storage
123    /// * `update_tx` - mpsc sender for status updates to the component graph
124    /// * `identity_provider` - Optional identity provider for credential injection
125    pub fn new(
126        instance_id: impl Into<String>,
127        source_id: impl Into<String>,
128        state_store: Option<Arc<dyn StateStoreProvider>>,
129        update_tx: ComponentUpdateSender,
130        identity_provider: Option<Arc<dyn IdentityProvider>>,
131    ) -> Self {
132        Self {
133            instance_id: instance_id.into(),
134            source_id: source_id.into(),
135            state_store,
136            update_tx,
137            identity_provider,
138        }
139    }
140
141    /// Get the DrasiLib instance ID.
142    pub fn instance_id(&self) -> &str {
143        &self.instance_id
144    }
145
146    /// Get the source's unique identifier.
147    pub fn source_id(&self) -> &str {
148        &self.source_id
149    }
150
151    /// Get a reference to the state store if configured.
152    ///
153    /// Returns `Some(&Arc<dyn StateStoreProvider>)` if a state store was configured,
154    /// otherwise `None`.
155    pub fn state_store(&self) -> Option<&Arc<dyn StateStoreProvider>> {
156        self.state_store.as_ref()
157    }
158}
159
160impl std::fmt::Debug for SourceRuntimeContext {
161    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162        f.debug_struct("SourceRuntimeContext")
163            .field("instance_id", &self.instance_id)
164            .field("source_id", &self.source_id)
165            .field(
166                "state_store",
167                &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
168            )
169            .field("update_tx", &"<ComponentUpdateSender>")
170            .field(
171                "identity_provider",
172                &self
173                    .identity_provider
174                    .as_ref()
175                    .map(|_| "<IdentityProvider>"),
176            )
177            .finish()
178    }
179}
180
181/// Context provided to Reaction plugins during initialization.
182///
183/// Contains `Arc<T>` instances for all drasi-lib provided services.
184/// DrasiLib constructs this context when a reaction is added via `add_reaction()`.
185///
186/// # Available Services
187///
188/// - `instance_id`: The DrasiLib instance ID (for log routing isolation)
189/// - `reaction_id`: The unique identifier for this reaction instance
190/// - `state_store`: Optional persistent state storage (if configured)
191/// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
192/// - `identity_provider`: Optional identity provider for credential injection
193///
194/// # Clone
195///
196/// This struct implements `Clone` and all fields use `Arc` internally,
197/// making cloning cheap (just reference count increments).
198#[derive(Clone)]
199pub struct ReactionRuntimeContext {
200    /// DrasiLib instance ID (for log routing isolation)
201    pub instance_id: String,
202
203    /// Unique identifier for this reaction instance
204    pub reaction_id: String,
205
206    /// Optional persistent state storage.
207    ///
208    /// This is `Some` if a state store provider was configured on DrasiLib,
209    /// otherwise `None`. Reactions can use this to persist state across restarts.
210    pub state_store: Option<Arc<dyn StateStoreProvider>>,
211
212    /// mpsc sender for fire-and-forget component status updates.
213    ///
214    /// Status changes sent here are applied to the component graph by the
215    /// graph update loop, which emits broadcast events to all subscribers.
216    pub update_tx: ComponentUpdateSender,
217
218    /// Optional identity provider for credential injection.
219    ///
220    /// This is `Some` if the host has configured an identity provider for this component.
221    /// Reactions can use this to obtain authentication credentials (passwords, tokens,
222    /// certificates) for connecting to external systems.
223    pub identity_provider: Option<Arc<dyn IdentityProvider>>,
224
225    /// Optional snapshot fetcher for on-demand query snapshot access.
226    ///
227    /// This is `Some` when the host has a `QueryProvider` available. Reactions can
228    /// use this to fetch the current result set of any subscribed query at any time,
229    /// not just during bootstrap.
230    ///
231    /// The fetcher is scoped to the reaction's configured query IDs — attempting to
232    /// fetch a snapshot for an unsubscribed query will return an error.
233    pub snapshot_fetcher: Option<Arc<dyn SnapshotFetcher>>,
234}
235
236impl ReactionRuntimeContext {
237    /// Create a new reaction runtime context.
238    ///
239    /// This is typically called by `ReactionManager` when adding a reaction to DrasiLib.
240    /// Plugin developers do not need to call this directly.
241    ///
242    /// # Arguments
243    ///
244    /// * `instance_id` - The DrasiLib instance ID
245    /// * `reaction_id` - The unique identifier for this reaction
246    /// * `state_store` - Optional persistent state storage
247    /// * `update_tx` - mpsc sender for status updates to the component graph
248    /// * `identity_provider` - Optional identity provider for credential injection
249    pub fn new(
250        instance_id: impl Into<String>,
251        reaction_id: impl Into<String>,
252        state_store: Option<Arc<dyn StateStoreProvider>>,
253        update_tx: ComponentUpdateSender,
254        identity_provider: Option<Arc<dyn IdentityProvider>>,
255    ) -> Self {
256        Self {
257            instance_id: instance_id.into(),
258            reaction_id: reaction_id.into(),
259            state_store,
260            update_tx,
261            identity_provider,
262            snapshot_fetcher: None,
263        }
264    }
265
266    /// Get the DrasiLib instance ID.
267    pub fn instance_id(&self) -> &str {
268        &self.instance_id
269    }
270
271    /// Get the reaction's unique identifier.
272    pub fn reaction_id(&self) -> &str {
273        &self.reaction_id
274    }
275
276    /// Get a reference to the state store if configured.
277    pub fn state_store(&self) -> Option<&Arc<dyn StateStoreProvider>> {
278        self.state_store.as_ref()
279    }
280}
281
282impl std::fmt::Debug for ReactionRuntimeContext {
283    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284        f.debug_struct("ReactionRuntimeContext")
285            .field("instance_id", &self.instance_id)
286            .field("reaction_id", &self.reaction_id)
287            .field(
288                "state_store",
289                &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
290            )
291            .field("update_tx", &"<ComponentUpdateSender>")
292            .field(
293                "identity_provider",
294                &self
295                    .identity_provider
296                    .as_ref()
297                    .map(|_| "<IdentityProvider>"),
298            )
299            .field(
300                "snapshot_fetcher",
301                &self.snapshot_fetcher.as_ref().map(|_| "<SnapshotFetcher>"),
302            )
303            .finish()
304    }
305}
306
307/// Context provided to Query components during initialization.
308///
309/// Contains the DrasiLib instance ID and update channel for status reporting.
310/// Constructed by `QueryManager` when a query is added via `add_query()`.
311///
312/// Unlike sources and reactions, queries are internal to drasi-lib (not plugins),
313/// but still follow the same context-based initialization pattern for consistency.
314///
315/// # Clone
316///
317/// This struct implements `Clone` and uses `Arc` internally for the update channel,
318/// making cloning cheap (just reference count increments).
319#[derive(Clone)]
320pub struct QueryRuntimeContext {
321    /// DrasiLib instance ID (for log routing isolation)
322    pub instance_id: String,
323
324    /// Unique identifier for this query instance
325    pub query_id: String,
326
327    /// mpsc sender for fire-and-forget component status updates.
328    ///
329    /// Status changes sent here are applied to the component graph by the
330    /// graph update loop, which emits broadcast events to all subscribers.
331    pub update_tx: ComponentUpdateSender,
332}
333
334impl QueryRuntimeContext {
335    /// Create a new query runtime context.
336    ///
337    /// This is typically called by `QueryManager` when adding a query to DrasiLib.
338    ///
339    /// # Arguments
340    ///
341    /// * `instance_id` - The DrasiLib instance ID
342    /// * `query_id` - The unique identifier for this query
343    /// * `update_tx` - mpsc sender for status updates to the component graph
344    pub fn new(
345        instance_id: impl Into<String>,
346        query_id: impl Into<String>,
347        update_tx: ComponentUpdateSender,
348    ) -> Self {
349        Self {
350            instance_id: instance_id.into(),
351            query_id: query_id.into(),
352            update_tx,
353        }
354    }
355
356    /// Get the DrasiLib instance ID.
357    pub fn instance_id(&self) -> &str {
358        &self.instance_id
359    }
360
361    /// Get the query's unique identifier.
362    pub fn query_id(&self) -> &str {
363        &self.query_id
364    }
365}
366
367impl std::fmt::Debug for QueryRuntimeContext {
368    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369        f.debug_struct("QueryRuntimeContext")
370            .field("instance_id", &self.instance_id)
371            .field("query_id", &self.query_id)
372            .field("update_tx", &"<ComponentUpdateSender>")
373            .finish()
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380    use crate::component_graph::ComponentGraph;
381    use crate::state_store::MemoryStateStoreProvider;
382    use std::sync::Arc;
383
384    fn test_update_tx() -> ComponentUpdateSender {
385        let (graph, _rx) = ComponentGraph::new("test-instance");
386        graph.update_sender()
387    }
388
389    #[tokio::test]
390    async fn test_source_runtime_context_creation() {
391        let state_store = Arc::new(MemoryStateStoreProvider::new());
392        let update_tx = test_update_tx();
393
394        let context = SourceRuntimeContext::new(
395            "test-instance",
396            "test-source",
397            Some(state_store),
398            update_tx,
399            None,
400        );
401
402        assert_eq!(context.instance_id(), "test-instance");
403        assert_eq!(context.source_id(), "test-source");
404        assert!(context.state_store().is_some());
405    }
406
407    #[tokio::test]
408    async fn test_source_runtime_context_without_state_store() {
409        let update_tx = test_update_tx();
410
411        let context =
412            SourceRuntimeContext::new("test-instance", "test-source", None, update_tx, None);
413
414        assert_eq!(context.source_id(), "test-source");
415        assert!(context.state_store().is_none());
416    }
417
418    #[tokio::test]
419    async fn test_source_runtime_context_clone() {
420        let state_store = Arc::new(MemoryStateStoreProvider::new());
421        let update_tx = test_update_tx();
422
423        let context = SourceRuntimeContext::new(
424            "test-instance",
425            "test-source",
426            Some(state_store),
427            update_tx,
428            None,
429        );
430
431        let cloned = context.clone();
432        assert_eq!(cloned.source_id(), context.source_id());
433    }
434
435    #[tokio::test]
436    async fn test_reaction_runtime_context_creation() {
437        let state_store = Arc::new(MemoryStateStoreProvider::new());
438        let update_tx = test_update_tx();
439
440        let context = ReactionRuntimeContext::new(
441            "test-instance",
442            "test-reaction",
443            Some(state_store),
444            update_tx,
445            None,
446        );
447
448        assert_eq!(context.instance_id(), "test-instance");
449        assert_eq!(context.reaction_id(), "test-reaction");
450        assert!(context.state_store().is_some());
451    }
452
453    #[tokio::test]
454    async fn test_reaction_runtime_context_without_state_store() {
455        let update_tx = test_update_tx();
456
457        let context =
458            ReactionRuntimeContext::new("test-instance", "test-reaction", None, update_tx, None);
459
460        assert_eq!(context.reaction_id(), "test-reaction");
461        assert!(context.state_store().is_none());
462    }
463
464    #[tokio::test]
465    async fn test_reaction_runtime_context_clone() {
466        let state_store = Arc::new(MemoryStateStoreProvider::new());
467        let update_tx = test_update_tx();
468
469        let context = ReactionRuntimeContext::new(
470            "test-instance",
471            "test-reaction",
472            Some(state_store),
473            update_tx,
474            None,
475        );
476
477        let cloned = context.clone();
478        assert_eq!(cloned.reaction_id(), context.reaction_id());
479    }
480
481    #[test]
482    fn test_source_runtime_context_debug() {
483        let update_tx = test_update_tx();
484        let context = SourceRuntimeContext::new("test-instance", "test", None, update_tx, None);
485        let debug_str = format!("{context:?}");
486        assert!(debug_str.contains("SourceRuntimeContext"));
487        assert!(debug_str.contains("test"));
488    }
489
490    #[test]
491    fn test_reaction_runtime_context_debug() {
492        let update_tx = test_update_tx();
493        let context = ReactionRuntimeContext::new("test-instance", "test", None, update_tx, None);
494        let debug_str = format!("{context:?}");
495        assert!(debug_str.contains("ReactionRuntimeContext"));
496        assert!(debug_str.contains("test"));
497    }
498
499    #[tokio::test]
500    async fn test_query_runtime_context_creation() {
501        let update_tx = test_update_tx();
502        let context = QueryRuntimeContext::new("test-instance", "test-query", update_tx);
503
504        assert_eq!(context.instance_id(), "test-instance");
505        assert_eq!(context.query_id(), "test-query");
506    }
507
508    #[tokio::test]
509    async fn test_query_runtime_context_clone() {
510        let update_tx = test_update_tx();
511        let context = QueryRuntimeContext::new("test-instance", "test-query", update_tx);
512
513        let cloned = context.clone();
514        assert_eq!(cloned.query_id(), context.query_id());
515        assert_eq!(cloned.instance_id(), context.instance_id());
516    }
517
518    #[test]
519    fn test_query_runtime_context_debug() {
520        let update_tx = test_update_tx();
521        let context = QueryRuntimeContext::new("test-instance", "test-query", update_tx);
522        let debug_str = format!("{context:?}");
523        assert!(debug_str.contains("QueryRuntimeContext"));
524        assert!(debug_str.contains("test-query"));
525    }
526}