drasi_lib/context/mod.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Runtime context types for plugin service injection.
16//!
17//! This module provides `SourceRuntimeContext` and `ReactionRuntimeContext` structs
18//! that contain `Arc<T>` instances for drasi-lib provided services. These contexts
19//! are provided to plugins when they are added to DrasiLib via the `initialize()` method.
20//!
21//! # Overview
22//!
23//! Instead of multiple `inject_*` methods, plugins now receive all services through
24//! a single `initialize(context)` call. This provides:
25//!
26//! - **Single initialization point**: One call instead of multiple inject calls
27//! - **Guaranteed complete initialization**: Context is complete or doesn't exist
28//! - **Clearer API contract**: All available services visible at a glance
29//! - **Extensibility**: New services can be added without changing traits
30//!
31//! # Example - Source Plugin
32//!
33//! ```ignore
34//! use drasi_lib::context::SourceRuntimeContext;
35//!
36//! #[async_trait]
37//! impl Source for MySource {
38//! async fn initialize(&self, context: SourceRuntimeContext) {
39//! // Store context for later use
40//! self.base.initialize(context).await;
41//! }
42//! // ...
43//! }
44//! ```
45//!
46//! # Example - Reaction Plugin
47//!
48//! ```ignore
49//! use drasi_lib::context::ReactionRuntimeContext;
50//!
51//! #[async_trait]
52//! impl Reaction for MyReaction {
53//! async fn initialize(&self, context: ReactionRuntimeContext) {
54//! // Store context for later use
55//! self.base.initialize(context).await;
56//! }
57//! // ...
58//! }
59//! ```
60
61use std::sync::Arc;
62
63use crate::component_graph::ComponentUpdateSender;
64use crate::identity::IdentityProvider;
65use crate::reactions::SnapshotFetcher;
66use crate::state_store::StateStoreProvider;
67
68/// Context provided to Source plugins during initialization.
69///
70/// Contains `Arc<T>` instances for all drasi-lib provided services.
71/// DrasiLib constructs this context when a source is added via `add_source()`.
72///
73/// # Available Services
74///
75/// - `instance_id`: The DrasiLib instance ID (for log routing isolation)
76/// - `source_id`: The unique identifier for this source instance
77/// - `state_store`: Optional persistent state storage (if configured)
78/// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
79///
80/// # Clone
81///
82/// This struct implements `Clone` and all fields use `Arc` internally,
83/// making cloning cheap (just reference count increments).
84#[derive(Clone)]
85pub struct SourceRuntimeContext {
86 /// DrasiLib instance ID (for log routing isolation)
87 pub instance_id: String,
88
89 /// Unique identifier for this source instance
90 pub source_id: String,
91
92 /// Optional persistent state storage.
93 ///
94 /// This is `Some` if a state store provider was configured on DrasiLib,
95 /// otherwise `None`. Sources can use this to persist state across restarts.
96 pub state_store: Option<Arc<dyn StateStoreProvider>>,
97
98 /// mpsc sender for fire-and-forget component status updates.
99 ///
100 /// Status changes sent here are applied to the component graph by the
101 /// graph update loop, which emits broadcast events to all subscribers.
102 pub update_tx: ComponentUpdateSender,
103
104 /// Optional identity provider for credential injection.
105 ///
106 /// This is `Some` if the host has configured an identity provider for this component.
107 /// Sources can use this to obtain authentication credentials (passwords, tokens,
108 /// certificates) for connecting to external systems.
109 pub identity_provider: Option<Arc<dyn IdentityProvider>>,
110}
111
112impl SourceRuntimeContext {
113 /// Create a new source runtime context.
114 ///
115 /// This is typically called by `SourceManager` when adding a source to DrasiLib.
116 /// Plugin developers do not need to call this directly.
117 ///
118 /// # Arguments
119 ///
120 /// * `instance_id` - The DrasiLib instance ID
121 /// * `source_id` - The unique identifier for this source
122 /// * `state_store` - Optional persistent state storage
123 /// * `update_tx` - mpsc sender for status updates to the component graph
124 /// * `identity_provider` - Optional identity provider for credential injection
125 pub fn new(
126 instance_id: impl Into<String>,
127 source_id: impl Into<String>,
128 state_store: Option<Arc<dyn StateStoreProvider>>,
129 update_tx: ComponentUpdateSender,
130 identity_provider: Option<Arc<dyn IdentityProvider>>,
131 ) -> Self {
132 Self {
133 instance_id: instance_id.into(),
134 source_id: source_id.into(),
135 state_store,
136 update_tx,
137 identity_provider,
138 }
139 }
140
141 /// Get the DrasiLib instance ID.
142 pub fn instance_id(&self) -> &str {
143 &self.instance_id
144 }
145
146 /// Get the source's unique identifier.
147 pub fn source_id(&self) -> &str {
148 &self.source_id
149 }
150
151 /// Get a reference to the state store if configured.
152 ///
153 /// Returns `Some(&Arc<dyn StateStoreProvider>)` if a state store was configured,
154 /// otherwise `None`.
155 pub fn state_store(&self) -> Option<&Arc<dyn StateStoreProvider>> {
156 self.state_store.as_ref()
157 }
158}
159
160impl std::fmt::Debug for SourceRuntimeContext {
161 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
162 f.debug_struct("SourceRuntimeContext")
163 .field("instance_id", &self.instance_id)
164 .field("source_id", &self.source_id)
165 .field(
166 "state_store",
167 &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
168 )
169 .field("update_tx", &"<ComponentUpdateSender>")
170 .field(
171 "identity_provider",
172 &self
173 .identity_provider
174 .as_ref()
175 .map(|_| "<IdentityProvider>"),
176 )
177 .finish()
178 }
179}
180
181/// Context provided to Reaction plugins during initialization.
182///
183/// Contains `Arc<T>` instances for all drasi-lib provided services.
184/// DrasiLib constructs this context when a reaction is added via `add_reaction()`.
185///
186/// # Available Services
187///
188/// - `instance_id`: The DrasiLib instance ID (for log routing isolation)
189/// - `reaction_id`: The unique identifier for this reaction instance
190/// - `state_store`: Optional persistent state storage (if configured)
191/// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
192/// - `identity_provider`: Optional identity provider for credential injection
193///
194/// # Clone
195///
196/// This struct implements `Clone` and all fields use `Arc` internally,
197/// making cloning cheap (just reference count increments).
198#[derive(Clone)]
199pub struct ReactionRuntimeContext {
200 /// DrasiLib instance ID (for log routing isolation)
201 pub instance_id: String,
202
203 /// Unique identifier for this reaction instance
204 pub reaction_id: String,
205
206 /// Optional persistent state storage.
207 ///
208 /// This is `Some` if a state store provider was configured on DrasiLib,
209 /// otherwise `None`. Reactions can use this to persist state across restarts.
210 pub state_store: Option<Arc<dyn StateStoreProvider>>,
211
212 /// mpsc sender for fire-and-forget component status updates.
213 ///
214 /// Status changes sent here are applied to the component graph by the
215 /// graph update loop, which emits broadcast events to all subscribers.
216 pub update_tx: ComponentUpdateSender,
217
218 /// Optional identity provider for credential injection.
219 ///
220 /// This is `Some` if the host has configured an identity provider for this component.
221 /// Reactions can use this to obtain authentication credentials (passwords, tokens,
222 /// certificates) for connecting to external systems.
223 pub identity_provider: Option<Arc<dyn IdentityProvider>>,
224
225 /// Optional snapshot fetcher for on-demand query snapshot access.
226 ///
227 /// This is `Some` when the host has a `QueryProvider` available. Reactions can
228 /// use this to fetch the current result set of any subscribed query at any time,
229 /// not just during bootstrap.
230 ///
231 /// The fetcher is scoped to the reaction's configured query IDs — attempting to
232 /// fetch a snapshot for an unsubscribed query will return an error.
233 pub snapshot_fetcher: Option<Arc<dyn SnapshotFetcher>>,
234}
235
236impl ReactionRuntimeContext {
237 /// Create a new reaction runtime context.
238 ///
239 /// This is typically called by `ReactionManager` when adding a reaction to DrasiLib.
240 /// Plugin developers do not need to call this directly.
241 ///
242 /// # Arguments
243 ///
244 /// * `instance_id` - The DrasiLib instance ID
245 /// * `reaction_id` - The unique identifier for this reaction
246 /// * `state_store` - Optional persistent state storage
247 /// * `update_tx` - mpsc sender for status updates to the component graph
248 /// * `identity_provider` - Optional identity provider for credential injection
249 pub fn new(
250 instance_id: impl Into<String>,
251 reaction_id: impl Into<String>,
252 state_store: Option<Arc<dyn StateStoreProvider>>,
253 update_tx: ComponentUpdateSender,
254 identity_provider: Option<Arc<dyn IdentityProvider>>,
255 ) -> Self {
256 Self {
257 instance_id: instance_id.into(),
258 reaction_id: reaction_id.into(),
259 state_store,
260 update_tx,
261 identity_provider,
262 snapshot_fetcher: None,
263 }
264 }
265
266 /// Get the DrasiLib instance ID.
267 pub fn instance_id(&self) -> &str {
268 &self.instance_id
269 }
270
271 /// Get the reaction's unique identifier.
272 pub fn reaction_id(&self) -> &str {
273 &self.reaction_id
274 }
275
276 /// Get a reference to the state store if configured.
277 pub fn state_store(&self) -> Option<&Arc<dyn StateStoreProvider>> {
278 self.state_store.as_ref()
279 }
280}
281
282impl std::fmt::Debug for ReactionRuntimeContext {
283 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
284 f.debug_struct("ReactionRuntimeContext")
285 .field("instance_id", &self.instance_id)
286 .field("reaction_id", &self.reaction_id)
287 .field(
288 "state_store",
289 &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
290 )
291 .field("update_tx", &"<ComponentUpdateSender>")
292 .field(
293 "identity_provider",
294 &self
295 .identity_provider
296 .as_ref()
297 .map(|_| "<IdentityProvider>"),
298 )
299 .field(
300 "snapshot_fetcher",
301 &self.snapshot_fetcher.as_ref().map(|_| "<SnapshotFetcher>"),
302 )
303 .finish()
304 }
305}
306
307/// Context provided to Query components during initialization.
308///
309/// Contains the DrasiLib instance ID and update channel for status reporting.
310/// Constructed by `QueryManager` when a query is added via `add_query()`.
311///
312/// Unlike sources and reactions, queries are internal to drasi-lib (not plugins),
313/// but still follow the same context-based initialization pattern for consistency.
314///
315/// # Clone
316///
317/// This struct implements `Clone` and uses `Arc` internally for the update channel,
318/// making cloning cheap (just reference count increments).
319#[derive(Clone)]
320pub struct QueryRuntimeContext {
321 /// DrasiLib instance ID (for log routing isolation)
322 pub instance_id: String,
323
324 /// Unique identifier for this query instance
325 pub query_id: String,
326
327 /// mpsc sender for fire-and-forget component status updates.
328 ///
329 /// Status changes sent here are applied to the component graph by the
330 /// graph update loop, which emits broadcast events to all subscribers.
331 pub update_tx: ComponentUpdateSender,
332}
333
334impl QueryRuntimeContext {
335 /// Create a new query runtime context.
336 ///
337 /// This is typically called by `QueryManager` when adding a query to DrasiLib.
338 ///
339 /// # Arguments
340 ///
341 /// * `instance_id` - The DrasiLib instance ID
342 /// * `query_id` - The unique identifier for this query
343 /// * `update_tx` - mpsc sender for status updates to the component graph
344 pub fn new(
345 instance_id: impl Into<String>,
346 query_id: impl Into<String>,
347 update_tx: ComponentUpdateSender,
348 ) -> Self {
349 Self {
350 instance_id: instance_id.into(),
351 query_id: query_id.into(),
352 update_tx,
353 }
354 }
355
356 /// Get the DrasiLib instance ID.
357 pub fn instance_id(&self) -> &str {
358 &self.instance_id
359 }
360
361 /// Get the query's unique identifier.
362 pub fn query_id(&self) -> &str {
363 &self.query_id
364 }
365}
366
367impl std::fmt::Debug for QueryRuntimeContext {
368 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
369 f.debug_struct("QueryRuntimeContext")
370 .field("instance_id", &self.instance_id)
371 .field("query_id", &self.query_id)
372 .field("update_tx", &"<ComponentUpdateSender>")
373 .finish()
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380 use crate::component_graph::ComponentGraph;
381 use crate::state_store::MemoryStateStoreProvider;
382 use std::sync::Arc;
383
384 fn test_update_tx() -> ComponentUpdateSender {
385 let (graph, _rx) = ComponentGraph::new("test-instance");
386 graph.update_sender()
387 }
388
389 #[tokio::test]
390 async fn test_source_runtime_context_creation() {
391 let state_store = Arc::new(MemoryStateStoreProvider::new());
392 let update_tx = test_update_tx();
393
394 let context = SourceRuntimeContext::new(
395 "test-instance",
396 "test-source",
397 Some(state_store),
398 update_tx,
399 None,
400 );
401
402 assert_eq!(context.instance_id(), "test-instance");
403 assert_eq!(context.source_id(), "test-source");
404 assert!(context.state_store().is_some());
405 }
406
407 #[tokio::test]
408 async fn test_source_runtime_context_without_state_store() {
409 let update_tx = test_update_tx();
410
411 let context =
412 SourceRuntimeContext::new("test-instance", "test-source", None, update_tx, None);
413
414 assert_eq!(context.source_id(), "test-source");
415 assert!(context.state_store().is_none());
416 }
417
418 #[tokio::test]
419 async fn test_source_runtime_context_clone() {
420 let state_store = Arc::new(MemoryStateStoreProvider::new());
421 let update_tx = test_update_tx();
422
423 let context = SourceRuntimeContext::new(
424 "test-instance",
425 "test-source",
426 Some(state_store),
427 update_tx,
428 None,
429 );
430
431 let cloned = context.clone();
432 assert_eq!(cloned.source_id(), context.source_id());
433 }
434
435 #[tokio::test]
436 async fn test_reaction_runtime_context_creation() {
437 let state_store = Arc::new(MemoryStateStoreProvider::new());
438 let update_tx = test_update_tx();
439
440 let context = ReactionRuntimeContext::new(
441 "test-instance",
442 "test-reaction",
443 Some(state_store),
444 update_tx,
445 None,
446 );
447
448 assert_eq!(context.instance_id(), "test-instance");
449 assert_eq!(context.reaction_id(), "test-reaction");
450 assert!(context.state_store().is_some());
451 }
452
453 #[tokio::test]
454 async fn test_reaction_runtime_context_without_state_store() {
455 let update_tx = test_update_tx();
456
457 let context =
458 ReactionRuntimeContext::new("test-instance", "test-reaction", None, update_tx, None);
459
460 assert_eq!(context.reaction_id(), "test-reaction");
461 assert!(context.state_store().is_none());
462 }
463
464 #[tokio::test]
465 async fn test_reaction_runtime_context_clone() {
466 let state_store = Arc::new(MemoryStateStoreProvider::new());
467 let update_tx = test_update_tx();
468
469 let context = ReactionRuntimeContext::new(
470 "test-instance",
471 "test-reaction",
472 Some(state_store),
473 update_tx,
474 None,
475 );
476
477 let cloned = context.clone();
478 assert_eq!(cloned.reaction_id(), context.reaction_id());
479 }
480
481 #[test]
482 fn test_source_runtime_context_debug() {
483 let update_tx = test_update_tx();
484 let context = SourceRuntimeContext::new("test-instance", "test", None, update_tx, None);
485 let debug_str = format!("{context:?}");
486 assert!(debug_str.contains("SourceRuntimeContext"));
487 assert!(debug_str.contains("test"));
488 }
489
490 #[test]
491 fn test_reaction_runtime_context_debug() {
492 let update_tx = test_update_tx();
493 let context = ReactionRuntimeContext::new("test-instance", "test", None, update_tx, None);
494 let debug_str = format!("{context:?}");
495 assert!(debug_str.contains("ReactionRuntimeContext"));
496 assert!(debug_str.contains("test"));
497 }
498
499 #[tokio::test]
500 async fn test_query_runtime_context_creation() {
501 let update_tx = test_update_tx();
502 let context = QueryRuntimeContext::new("test-instance", "test-query", update_tx);
503
504 assert_eq!(context.instance_id(), "test-instance");
505 assert_eq!(context.query_id(), "test-query");
506 }
507
508 #[tokio::test]
509 async fn test_query_runtime_context_clone() {
510 let update_tx = test_update_tx();
511 let context = QueryRuntimeContext::new("test-instance", "test-query", update_tx);
512
513 let cloned = context.clone();
514 assert_eq!(cloned.query_id(), context.query_id());
515 assert_eq!(cloned.instance_id(), context.instance_id());
516 }
517
518 #[test]
519 fn test_query_runtime_context_debug() {
520 let update_tx = test_update_tx();
521 let context = QueryRuntimeContext::new("test-instance", "test-query", update_tx);
522 let debug_str = format!("{context:?}");
523 assert!(debug_str.contains("QueryRuntimeContext"));
524 assert!(debug_str.contains("test-query"));
525 }
526}