Skip to main content

drasi_lib/reactions/common/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common reaction functionality.
16//!
17//! This module provides `ReactionBase` which encapsulates common patterns
18//! used across all reaction implementations:
19//! - Query subscription management
20//! - Priority queue handling
21//! - Task lifecycle management
22//! - Component status tracking
23//! - Event reporting
24//!
25//! # Plugin Architecture
26//!
27//! ReactionBase is designed to be used by reaction plugins. Each plugin:
28//! 1. Defines its own typed configuration struct
29//! 2. Creates a ReactionBase with ReactionBaseParams
30//! 3. Implements the Reaction trait delegating to ReactionBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::channels::priority_queue::PriorityQueue;
39use crate::channels::{ComponentStatus, QueryResult};
40use crate::component_graph::ComponentStatusHandle;
41use crate::context::ReactionRuntimeContext;
42use crate::identity::IdentityProvider;
43use crate::state_store::StateStoreProvider;
44
45/// Parameters for creating a ReactionBase instance.
46///
47/// This struct contains only the information that ReactionBase needs to function.
48/// Plugin-specific configuration should remain in the plugin crate.
49///
50/// # Example
51///
52/// ```ignore
53/// use drasi_lib::reactions::common::base::{ReactionBase, ReactionBaseParams};
54///
55/// let params = ReactionBaseParams::new("my-reaction", vec!["query1".to_string()])
56///     .with_priority_queue_capacity(5000)
57///     .with_auto_start(true);
58///
59/// let base = ReactionBase::new(params);
60/// ```
61#[derive(Debug, Clone)]
62pub struct ReactionBaseParams {
63    /// Unique identifier for the reaction
64    pub id: String,
65    /// List of query IDs this reaction subscribes to
66    pub queries: Vec<String>,
67    /// Priority queue capacity - defaults to 10000
68    pub priority_queue_capacity: Option<usize>,
69    /// Whether this reaction should auto-start - defaults to true
70    pub auto_start: bool,
71}
72
73impl ReactionBaseParams {
74    /// Create new params with ID and queries, using defaults for everything else
75    pub fn new(id: impl Into<String>, queries: Vec<String>) -> Self {
76        Self {
77            id: id.into(),
78            queries,
79            priority_queue_capacity: None,
80            auto_start: true, // Default to true like queries
81        }
82    }
83
84    /// Set the priority queue capacity
85    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
86        self.priority_queue_capacity = Some(capacity);
87        self
88    }
89
90    /// Set whether this reaction should auto-start
91    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
92        self.auto_start = auto_start;
93        self
94    }
95}
96
97/// Base implementation for common reaction functionality
98pub struct ReactionBase {
99    /// Reaction identifier
100    pub id: String,
101    /// List of query IDs to subscribe to
102    pub queries: Vec<String>,
103    /// Whether this reaction should auto-start
104    pub auto_start: bool,
105    /// Component status handle — always available, wired to graph during initialize().
106    status_handle: ComponentStatusHandle,
107    /// Runtime context (set by initialize())
108    context: Arc<RwLock<Option<ReactionRuntimeContext>>>,
109    /// State store provider (extracted from context for convenience)
110    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
111    /// Priority queue for timestamp-ordered result processing
112    pub priority_queue: PriorityQueue<QueryResult>,
113    /// Handles to subscription forwarder tasks
114    pub subscription_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
115    /// Handle to the main processing task
116    pub processing_task: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
117    /// Sender for shutdown signal to processing task
118    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
119    /// Optional identity provider for credential management.
120    /// Set either programmatically (via `set_identity_provider`) or automatically
121    /// from the runtime context during `initialize()`.
122    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
123    /// Original raw config JSON from the descriptor, preserving ConfigValue
124    /// envelopes (secrets, env vars) for lossless persistence roundtrips.
125    raw_config: Option<serde_json::Value>,
126}
127
128impl ReactionBase {
129    /// Create a new ReactionBase with the given parameters
130    ///
131    /// Dependencies (query subscriber, state store, graph) are not required during
132    /// construction - they will be provided via `initialize()` when the reaction is added to DrasiLib.
133    pub fn new(params: ReactionBaseParams) -> Self {
134        Self {
135            priority_queue: PriorityQueue::new(params.priority_queue_capacity.unwrap_or(10000)),
136            id: params.id.clone(),
137            queries: params.queries,
138            auto_start: params.auto_start,
139            status_handle: ComponentStatusHandle::new(&params.id),
140            context: Arc::new(RwLock::new(None)), // Set by initialize()
141            state_store: Arc::new(RwLock::new(None)), // Extracted from context
142            subscription_tasks: Arc::new(RwLock::new(Vec::new())),
143            processing_task: Arc::new(RwLock::new(None)),
144            shutdown_tx: Arc::new(RwLock::new(None)),
145            identity_provider: Arc::new(RwLock::new(None)),
146            raw_config: None,
147        }
148    }
149
150    /// Initialize the reaction with runtime context.
151    ///
152    /// This method is called automatically by DrasiLib's `add_reaction()` method.
153    /// Plugin developers do not need to call this directly.
154    ///
155    /// The context provides access to:
156    /// - `reaction_id`: The reaction's unique identifier
157    /// - `state_store`: Optional persistent state storage
158    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the graph
159    pub async fn initialize(&self, context: ReactionRuntimeContext) {
160        // Store context for later use
161        *self.context.write().await = Some(context.clone());
162
163        // Wire the status handle to the graph update channel
164        self.status_handle.wire(context.update_tx.clone()).await;
165
166        if let Some(state_store) = context.state_store.as_ref() {
167            *self.state_store.write().await = Some(state_store.clone());
168        }
169
170        // Store identity provider from context if not already set programmatically
171        if let Some(ip) = context.identity_provider.as_ref() {
172            let mut guard = self.identity_provider.write().await;
173            if guard.is_none() {
174                *guard = Some(ip.clone());
175            }
176        }
177    }
178
179    /// Get the runtime context if initialized.
180    ///
181    /// Returns `None` if `initialize()` has not been called yet.
182    pub async fn context(&self) -> Option<ReactionRuntimeContext> {
183        self.context.read().await.clone()
184    }
185
186    /// Get the state store if configured.
187    ///
188    /// Returns `None` if no state store was provided in the context.
189    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
190        self.state_store.read().await.clone()
191    }
192
193    /// Get the identity provider if set.
194    ///
195    /// Returns the identity provider set either programmatically via
196    /// `set_identity_provider()` or from the runtime context during `initialize()`.
197    /// Programmatically-set providers take precedence over context providers.
198    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
199        self.identity_provider.read().await.clone()
200    }
201
202    /// Set the identity provider programmatically.
203    ///
204    /// This is typically called during reaction construction when the provider
205    /// is available from configuration (e.g., `with_identity_provider()` builder).
206    /// Providers set this way take precedence over context-injected providers.
207    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
208        *self.identity_provider.write().await = Some(provider);
209    }
210
211    /// Get whether this reaction should auto-start
212    pub fn get_auto_start(&self) -> bool {
213        self.auto_start
214    }
215
216    /// Set the original raw config JSON for lossless persistence roundtrips.
217    pub fn set_raw_config(&mut self, config: serde_json::Value) {
218        self.raw_config = Some(config);
219    }
220
221    /// Get the original raw config JSON, if set by a descriptor.
222    pub fn raw_config(&self) -> Option<&serde_json::Value> {
223        self.raw_config.as_ref()
224    }
225
226    /// Build the properties map for this reaction.
227    ///
228    /// If `raw_config` was set (descriptor path), returns its top-level keys.
229    /// Otherwise, serializes `fallback_dto` (the DTO reconstructed from typed
230    /// config) to produce camelCase output.
231    ///
232    /// This eliminates the duplicated if-let + serialize pattern from plugins.
233    pub fn properties_or_serialize<D: serde::Serialize>(
234        &self,
235        fallback_dto: &D,
236    ) -> std::collections::HashMap<String, serde_json::Value> {
237        if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
238            return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
239        }
240
241        match serde_json::to_value(fallback_dto) {
242            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
243            _ => std::collections::HashMap::new(),
244        }
245    }
246
247    /// Clone the ReactionBase with shared Arc references
248    ///
249    /// This creates a new ReactionBase that shares the same underlying
250    /// data through Arc references. Useful for passing to spawned tasks.
251    pub fn clone_shared(&self) -> Self {
252        Self {
253            id: self.id.clone(),
254            queries: self.queries.clone(),
255            auto_start: self.auto_start,
256            status_handle: self.status_handle.clone(),
257            context: self.context.clone(),
258            state_store: self.state_store.clone(),
259            priority_queue: self.priority_queue.clone(),
260            subscription_tasks: self.subscription_tasks.clone(),
261            processing_task: self.processing_task.clone(),
262            shutdown_tx: self.shutdown_tx.clone(),
263            identity_provider: self.identity_provider.clone(),
264            raw_config: self.raw_config.clone(),
265        }
266    }
267
268    /// Create a shutdown channel and store the sender
269    ///
270    /// Returns the receiver which should be passed to the processing task.
271    /// The sender is stored internally and will be triggered by `stop_common()`.
272    ///
273    /// This should be called before spawning the processing task.
274    pub async fn create_shutdown_channel(&self) -> tokio::sync::oneshot::Receiver<()> {
275        let (tx, rx) = tokio::sync::oneshot::channel();
276        *self.shutdown_tx.write().await = Some(tx);
277        rx
278    }
279
280    /// Get the reaction ID
281    pub fn get_id(&self) -> &str {
282        &self.id
283    }
284
285    /// Get the query IDs
286    pub fn get_queries(&self) -> &[String] {
287        &self.queries
288    }
289
290    /// Get current status.
291    pub async fn get_status(&self) -> ComponentStatus {
292        self.status_handle.get_status().await
293    }
294
295    /// Returns a clonable [`ComponentStatusHandle`] for use in spawned tasks.
296    ///
297    /// The handle can both read and write the component's status and automatically
298    /// notifies the graph on every status change (after `initialize()`).
299    pub fn status_handle(&self) -> ComponentStatusHandle {
300        self.status_handle.clone()
301    }
302
303    /// Set the component's status — updates local state AND notifies the graph.
304    ///
305    /// This is the single canonical way to change a reaction's status.
306    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
307        self.status_handle.set_status(status, message).await;
308    }
309
310    /// Enqueue a query result for processing.
311    ///
312    /// The host calls this to forward query results to the reaction's priority queue.
313    /// Results are processed in timestamp order by the reaction's processing task.
314    pub async fn enqueue_query_result(&self, result: QueryResult) -> anyhow::Result<()> {
315        self.priority_queue.enqueue_wait(Arc::new(result)).await;
316        Ok(())
317    }
318
319    /// Perform common cleanup operations
320    ///
321    /// This method handles:
322    /// 1. Sending shutdown signal to processing task (for graceful termination)
323    /// 2. Aborting all subscription forwarder tasks
324    /// 3. Waiting for or aborting the processing task
325    /// 4. Draining the priority queue
326    pub async fn stop_common(&self) -> Result<()> {
327        info!("Stopping reaction: {}", self.id);
328
329        // Send shutdown signal to processing task (if it's using tokio::select!)
330        if let Some(tx) = self.shutdown_tx.write().await.take() {
331            let _ = tx.send(());
332        }
333
334        // Abort all subscription forwarder tasks
335        let mut subscription_tasks = self.subscription_tasks.write().await;
336        for task in subscription_tasks.drain(..) {
337            task.abort();
338        }
339        drop(subscription_tasks);
340
341        // Wait for the processing task to complete (with timeout), or abort it
342        let mut processing_task = self.processing_task.write().await;
343        if let Some(mut task) = processing_task.take() {
344            // Give the task a short time to respond to the shutdown signal
345            match tokio::time::timeout(std::time::Duration::from_secs(2), &mut task).await {
346                Ok(Ok(())) => {
347                    debug!("[{}] Processing task completed gracefully", self.id);
348                }
349                Ok(Err(e)) => {
350                    // Task was aborted or panicked
351                    debug!("[{}] Processing task ended: {}", self.id, e);
352                }
353                Err(_) => {
354                    // Timeout - task didn't respond to shutdown signal
355                    warn!(
356                        "[{}] Processing task did not respond to shutdown signal within timeout, aborting",
357                        self.id
358                    );
359                    task.abort();
360                }
361            }
362        }
363        drop(processing_task);
364
365        // Drain the priority queue
366        let drained_events = self.priority_queue.drain().await;
367        if !drained_events.is_empty() {
368            info!(
369                "[{}] Drained {} pending events from priority queue",
370                self.id,
371                drained_events.len()
372            );
373        }
374
375        self.set_status(
376            ComponentStatus::Stopped,
377            Some(format!("Reaction '{}' stopped", self.id)),
378        )
379        .await;
380        info!("Reaction '{}' stopped", self.id);
381
382        Ok(())
383    }
384
385    /// Clear the reaction's state store partition.
386    ///
387    /// This is called during deprovision to remove all persisted state
388    /// associated with this reaction. Reactions that override `deprovision()`
389    /// can call this to clean up their state store.
390    pub async fn deprovision_common(&self) -> Result<()> {
391        info!("Deprovisioning reaction '{}'", self.id);
392        if let Some(store) = self.state_store().await {
393            let count = store.clear_store(&self.id).await.map_err(|e| {
394                anyhow::anyhow!(
395                    "Failed to clear state store for reaction '{}': {}",
396                    self.id,
397                    e
398                )
399            })?;
400            info!(
401                "Cleared {} keys from state store for reaction '{}'",
402                count, self.id
403            );
404        }
405        Ok(())
406    }
407
408    /// Set the processing task handle
409    pub async fn set_processing_task(&self, task: tokio::task::JoinHandle<()>) {
410        *self.processing_task.write().await = Some(task);
411    }
412}
413
414#[cfg(test)]
415mod tests {
416    use super::*;
417    use std::sync::atomic::{AtomicBool, Ordering};
418    use std::time::Duration;
419    use tokio::sync::mpsc;
420
421    #[tokio::test]
422    async fn test_reaction_base_creation() {
423        let params = ReactionBaseParams::new("test-reaction", vec!["query1".to_string()])
424            .with_priority_queue_capacity(5000);
425
426        let base = ReactionBase::new(params);
427        assert_eq!(base.id, "test-reaction");
428        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
429    }
430
431    #[tokio::test]
432    async fn test_status_transitions() {
433        use crate::context::ReactionRuntimeContext;
434
435        let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
436        let update_tx = graph.update_sender();
437        let graph = Arc::new(RwLock::new(graph));
438        let params = ReactionBaseParams::new("test-reaction", vec![]);
439
440        let base = ReactionBase::new(params);
441
442        // Create context and initialize
443        let context =
444            ReactionRuntimeContext::new("test-instance", "test-reaction", None, update_tx, None);
445        base.initialize(context).await;
446
447        // Test status transition
448        base.set_status(ComponentStatus::Starting, Some("Starting test".to_string()))
449            .await;
450
451        assert_eq!(base.get_status().await, ComponentStatus::Starting);
452
453        // Check event was sent via graph broadcast
454        let mut event_rx = graph.read().await.subscribe();
455        // The status was already set; emit another event to verify the graph path works
456        base.set_status(ComponentStatus::Running, Some("Running test".to_string()))
457            .await;
458
459        assert_eq!(base.get_status().await, ComponentStatus::Running);
460    }
461
462    #[tokio::test]
463    async fn test_priority_queue_operations() {
464        let params =
465            ReactionBaseParams::new("test-reaction", vec![]).with_priority_queue_capacity(10);
466
467        let base = ReactionBase::new(params);
468
469        // Create a test query result
470        let query_result = QueryResult::new(
471            "test-query".to_string(),
472            0,
473            chrono::Utc::now(),
474            vec![],
475            Default::default(),
476        );
477
478        // Enqueue result
479        let enqueued = base.priority_queue.enqueue(Arc::new(query_result)).await;
480        assert!(enqueued);
481
482        // Drain queue
483        let drained = base.priority_queue.drain().await;
484        assert_eq!(drained.len(), 1);
485    }
486
487    #[tokio::test]
488    async fn test_event_without_initialization() {
489        // Test that set_status works even without context initialization
490        let params = ReactionBaseParams::new("test-reaction", vec![]);
491
492        let base = ReactionBase::new(params);
493
494        // This should succeed without panicking (silently updates local only when handle is None)
495        base.set_status(ComponentStatus::Starting, None).await;
496    }
497
498    // =============================================================================
499    // Shutdown Channel Tests
500    // =============================================================================
501
502    #[tokio::test]
503    async fn test_create_shutdown_channel() {
504        let params = ReactionBaseParams::new("test-reaction", vec![]);
505        let base = ReactionBase::new(params);
506
507        // Initially no shutdown_tx
508        assert!(base.shutdown_tx.read().await.is_none());
509
510        // Create channel
511        let rx = base.create_shutdown_channel().await;
512
513        // Verify tx is stored
514        assert!(base.shutdown_tx.read().await.is_some());
515
516        // Verify receiver is valid (dropping it should not panic)
517        drop(rx);
518    }
519
520    #[tokio::test]
521    async fn test_shutdown_channel_signal() {
522        let params = ReactionBaseParams::new("test-reaction", vec![]);
523        let base = ReactionBase::new(params);
524
525        let mut rx = base.create_shutdown_channel().await;
526
527        // Send signal
528        if let Some(tx) = base.shutdown_tx.write().await.take() {
529            tx.send(()).unwrap();
530        }
531
532        // Verify signal received
533        let result = rx.try_recv();
534        assert!(result.is_ok());
535    }
536
537    #[tokio::test]
538    async fn test_shutdown_channel_replaced_on_second_create() {
539        let params = ReactionBaseParams::new("test-reaction", vec![]);
540        let base = ReactionBase::new(params);
541
542        // Create first channel
543        let _rx1 = base.create_shutdown_channel().await;
544
545        // Create second channel (should replace the first)
546        let mut rx2 = base.create_shutdown_channel().await;
547
548        // Send signal - should go to second channel
549        if let Some(tx) = base.shutdown_tx.write().await.take() {
550            tx.send(()).unwrap();
551        }
552
553        // Second receiver should get the signal
554        let result = rx2.try_recv();
555        assert!(result.is_ok());
556    }
557
558    #[tokio::test]
559    async fn test_stop_common_sends_shutdown_signal() {
560        let params = ReactionBaseParams::new("test-reaction", vec![]);
561        let base = ReactionBase::new(params);
562
563        let mut rx = base.create_shutdown_channel().await;
564
565        // Spawn a task that waits for shutdown
566        let shutdown_received = Arc::new(AtomicBool::new(false));
567        let shutdown_flag = shutdown_received.clone();
568
569        let task = tokio::spawn(async move {
570            tokio::select! {
571                _ = &mut rx => {
572                    shutdown_flag.store(true, Ordering::SeqCst);
573                }
574            }
575        });
576
577        base.set_processing_task(task).await;
578
579        // Call stop_common - should send shutdown signal and await the task
580        let _ = base.stop_common().await;
581
582        // stop_common awaits the processing task, so the flag should already be set
583        assert!(
584            shutdown_received.load(Ordering::SeqCst),
585            "Processing task should have received shutdown signal"
586        );
587    }
588
589    #[tokio::test]
590    async fn test_graceful_shutdown_timing() {
591        let params = ReactionBaseParams::new("test-reaction", vec![]);
592        let base = ReactionBase::new(params);
593
594        let rx = base.create_shutdown_channel().await;
595
596        // Spawn task that uses select! pattern like real reactions
597        let task = tokio::spawn(async move {
598            let mut shutdown_rx = rx;
599            loop {
600                tokio::select! {
601                    biased;
602                    _ = &mut shutdown_rx => {
603                        break;
604                    }
605                    _ = tokio::time::sleep(Duration::from_secs(10)) => {
606                        // Simulates waiting on priority_queue.dequeue()
607                    }
608                }
609            }
610        });
611
612        base.set_processing_task(task).await;
613
614        // Measure shutdown time
615        let start = std::time::Instant::now();
616        let _ = base.stop_common().await;
617        let elapsed = start.elapsed();
618
619        // Should complete quickly (< 500ms), not hit 2s timeout
620        assert!(
621            elapsed < Duration::from_millis(500),
622            "Shutdown took {elapsed:?}, expected < 500ms. Task may not be responding to shutdown signal."
623        );
624    }
625
626    #[tokio::test]
627    async fn test_stop_common_without_shutdown_channel() {
628        // Test that stop_common works even if no shutdown channel was created
629        let params = ReactionBaseParams::new("test-reaction", vec![]);
630        let base = ReactionBase::new(params);
631
632        // Don't create shutdown channel - just spawn a short-lived task
633        let task = tokio::spawn(async {
634            tokio::time::sleep(Duration::from_millis(10)).await;
635        });
636
637        base.set_processing_task(task).await;
638
639        // stop_common should still work
640        let result = base.stop_common().await;
641        assert!(result.is_ok());
642    }
643
644    // =============================================================================
645    // Accessor Tests
646    // =============================================================================
647
648    #[tokio::test]
649    async fn test_get_id() {
650        let params = ReactionBaseParams::new("my-reaction-42", vec![]);
651        let base = ReactionBase::new(params);
652        assert_eq!(base.get_id(), "my-reaction-42");
653    }
654
655    #[tokio::test]
656    async fn test_get_queries() {
657        let queries = vec!["query-a".to_string(), "query-b".to_string(), "query-c".to_string()];
658        let params = ReactionBaseParams::new("r1", queries.clone());
659        let base = ReactionBase::new(params);
660        assert_eq!(base.get_queries(), &queries[..]);
661    }
662
663    #[tokio::test]
664    async fn test_get_queries_empty() {
665        let params = ReactionBaseParams::new("r1", vec![]);
666        let base = ReactionBase::new(params);
667        assert!(base.get_queries().is_empty());
668    }
669
670    #[tokio::test]
671    async fn test_get_auto_start_default_true() {
672        let params = ReactionBaseParams::new("r1", vec![]);
673        let base = ReactionBase::new(params);
674        assert!(base.get_auto_start());
675    }
676
677    #[tokio::test]
678    async fn test_get_auto_start_override_false() {
679        let params = ReactionBaseParams::new("r1", vec![]).with_auto_start(false);
680        let base = ReactionBase::new(params);
681        assert!(!base.get_auto_start());
682    }
683
684    // =============================================================================
685    // Context / State Store / Identity Provider Tests
686    // =============================================================================
687
688    #[tokio::test]
689    async fn test_context_none_before_initialize() {
690        let params = ReactionBaseParams::new("r1", vec![]);
691        let base = ReactionBase::new(params);
692        assert!(base.context().await.is_none());
693    }
694
695    #[tokio::test]
696    async fn test_context_some_after_initialize() {
697        let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
698        let update_tx = graph.update_sender();
699        let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
700
701        let params = ReactionBaseParams::new("r1", vec![]);
702        let base = ReactionBase::new(params);
703        base.initialize(context).await;
704
705        let ctx = base.context().await;
706        assert!(ctx.is_some());
707        assert_eq!(ctx.unwrap().reaction_id, "r1");
708    }
709
710    #[tokio::test]
711    async fn test_state_store_none_when_not_configured() {
712        let params = ReactionBaseParams::new("r1", vec![]);
713        let base = ReactionBase::new(params);
714        assert!(base.state_store().await.is_none());
715    }
716
717    #[tokio::test]
718    async fn test_state_store_none_after_initialize_without_store() {
719        let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
720        let update_tx = graph.update_sender();
721        let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
722
723        let params = ReactionBaseParams::new("r1", vec![]);
724        let base = ReactionBase::new(params);
725        base.initialize(context).await;
726
727        assert!(base.state_store().await.is_none());
728    }
729
730    #[tokio::test]
731    async fn test_identity_provider_none_by_default() {
732        let params = ReactionBaseParams::new("r1", vec![]);
733        let base = ReactionBase::new(params);
734        assert!(base.identity_provider().await.is_none());
735    }
736
737    // =============================================================================
738    // Status Handle Tests
739    // =============================================================================
740
741    #[tokio::test]
742    async fn test_status_handle_returns_handle() {
743        let params = ReactionBaseParams::new("r1", vec![]);
744        let base = ReactionBase::new(params);
745
746        let handle = base.status_handle();
747        // The handle should share the same status as the base
748        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
749
750        // Mutating via handle should be visible via base
751        handle.set_status(ComponentStatus::Running, None).await;
752        assert_eq!(base.get_status().await, ComponentStatus::Running);
753    }
754
755    // =============================================================================
756    // Deprovision Tests
757    // =============================================================================
758
759    #[tokio::test]
760    async fn test_deprovision_common_noop_without_state_store() {
761        let params = ReactionBaseParams::new("r1", vec![]);
762        let base = ReactionBase::new(params);
763        // Should succeed without panicking when no state store is configured
764        let result = base.deprovision_common().await;
765        assert!(result.is_ok());
766    }
767
768    // =============================================================================
769    // Processing Task Tests
770    // =============================================================================
771
772    #[tokio::test]
773    async fn test_set_processing_task_stores_handle() {
774        let params = ReactionBaseParams::new("r1", vec![]);
775        let base = ReactionBase::new(params);
776
777        // Initially no processing task
778        assert!(base.processing_task.read().await.is_none());
779
780        let task = tokio::spawn(async {
781            tokio::time::sleep(Duration::from_secs(60)).await;
782        });
783
784        base.set_processing_task(task).await;
785
786        // Now it should be stored
787        assert!(base.processing_task.read().await.is_some());
788
789        // Clean up: abort the long-running task
790        let task = base.processing_task.write().await.take();
791        if let Some(t) = task {
792            t.abort();
793        }
794    }
795}