Skip to main content

drasi_lib/reactions/common/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common reaction functionality.
16//!
17//! This module provides `ReactionBase` which encapsulates common patterns
18//! used across all reaction implementations:
19//! - Query subscription management
20//! - Priority queue handling
21//! - Task lifecycle management
22//! - Component status tracking
23//! - Event reporting
24//!
25//! # Plugin Architecture
26//!
27//! ReactionBase is designed to be used by reaction plugins. Each plugin:
28//! 1. Defines its own typed configuration struct
29//! 2. Creates a ReactionBase with ReactionBaseParams
30//! 3. Implements the Reaction trait delegating to ReactionBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::channels::priority_queue::PriorityQueue;
39use crate::channels::{ComponentStatus, QueryResult};
40use crate::component_graph::ComponentStatusHandle;
41use crate::context::ReactionRuntimeContext;
42use crate::identity::IdentityProvider;
43use crate::state_store::StateStoreProvider;
44
45/// Parameters for creating a ReactionBase instance.
46///
47/// This struct contains only the information that ReactionBase needs to function.
48/// Plugin-specific configuration should remain in the plugin crate.
49///
50/// # Example
51///
52/// ```ignore
53/// use drasi_lib::reactions::common::base::{ReactionBase, ReactionBaseParams};
54///
55/// let params = ReactionBaseParams::new("my-reaction", vec!["query1".to_string()])
56///     .with_priority_queue_capacity(5000)
57///     .with_auto_start(true);
58///
59/// let base = ReactionBase::new(params);
60/// ```
61#[derive(Debug, Clone)]
62pub struct ReactionBaseParams {
63    /// Unique identifier for the reaction
64    pub id: String,
65    /// List of query IDs this reaction subscribes to
66    pub queries: Vec<String>,
67    /// Priority queue capacity - defaults to 10000
68    pub priority_queue_capacity: Option<usize>,
69    /// Whether this reaction should auto-start - defaults to true
70    pub auto_start: bool,
71}
72
73impl ReactionBaseParams {
74    /// Create new params with ID and queries, using defaults for everything else
75    pub fn new(id: impl Into<String>, queries: Vec<String>) -> Self {
76        Self {
77            id: id.into(),
78            queries,
79            priority_queue_capacity: None,
80            auto_start: true, // Default to true like queries
81        }
82    }
83
84    /// Set the priority queue capacity
85    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
86        self.priority_queue_capacity = Some(capacity);
87        self
88    }
89
90    /// Set whether this reaction should auto-start
91    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
92        self.auto_start = auto_start;
93        self
94    }
95}
96
97/// Base implementation for common reaction functionality
98pub struct ReactionBase {
99    /// Reaction identifier
100    pub id: String,
101    /// List of query IDs to subscribe to
102    pub queries: Vec<String>,
103    /// Whether this reaction should auto-start
104    pub auto_start: bool,
105    /// Component status handle — always available, wired to graph during initialize().
106    status_handle: ComponentStatusHandle,
107    /// Runtime context (set by initialize())
108    context: Arc<RwLock<Option<ReactionRuntimeContext>>>,
109    /// State store provider (extracted from context for convenience)
110    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
111    /// Priority queue for timestamp-ordered result processing
112    pub priority_queue: PriorityQueue<QueryResult>,
113    /// Handles to subscription forwarder tasks
114    pub subscription_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
115    /// Handle to the main processing task
116    pub processing_task: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
117    /// Sender for shutdown signal to processing task
118    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
119    /// Optional identity provider for credential management.
120    /// Set either programmatically (via `set_identity_provider`) or automatically
121    /// from the runtime context during `initialize()`.
122    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
123}
124
125impl ReactionBase {
126    /// Create a new ReactionBase with the given parameters
127    ///
128    /// Dependencies (query subscriber, state store, graph) are not required during
129    /// construction - they will be provided via `initialize()` when the reaction is added to DrasiLib.
130    pub fn new(params: ReactionBaseParams) -> Self {
131        Self {
132            priority_queue: PriorityQueue::new(params.priority_queue_capacity.unwrap_or(10000)),
133            id: params.id.clone(),
134            queries: params.queries,
135            auto_start: params.auto_start,
136            status_handle: ComponentStatusHandle::new(&params.id),
137            context: Arc::new(RwLock::new(None)), // Set by initialize()
138            state_store: Arc::new(RwLock::new(None)), // Extracted from context
139            subscription_tasks: Arc::new(RwLock::new(Vec::new())),
140            processing_task: Arc::new(RwLock::new(None)),
141            shutdown_tx: Arc::new(RwLock::new(None)),
142            identity_provider: Arc::new(RwLock::new(None)),
143        }
144    }
145
146    /// Initialize the reaction with runtime context.
147    ///
148    /// This method is called automatically by DrasiLib's `add_reaction()` method.
149    /// Plugin developers do not need to call this directly.
150    ///
151    /// The context provides access to:
152    /// - `reaction_id`: The reaction's unique identifier
153    /// - `state_store`: Optional persistent state storage
154    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the graph
155    pub async fn initialize(&self, context: ReactionRuntimeContext) {
156        // Store context for later use
157        *self.context.write().await = Some(context.clone());
158
159        // Wire the status handle to the graph update channel
160        self.status_handle.wire(context.update_tx.clone()).await;
161
162        if let Some(state_store) = context.state_store.as_ref() {
163            *self.state_store.write().await = Some(state_store.clone());
164        }
165
166        // Store identity provider from context if not already set programmatically
167        if let Some(ip) = context.identity_provider.as_ref() {
168            let mut guard = self.identity_provider.write().await;
169            if guard.is_none() {
170                *guard = Some(ip.clone());
171            }
172        }
173    }
174
175    /// Get the runtime context if initialized.
176    ///
177    /// Returns `None` if `initialize()` has not been called yet.
178    pub async fn context(&self) -> Option<ReactionRuntimeContext> {
179        self.context.read().await.clone()
180    }
181
182    /// Get the state store if configured.
183    ///
184    /// Returns `None` if no state store was provided in the context.
185    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
186        self.state_store.read().await.clone()
187    }
188
189    /// Get the identity provider if set.
190    ///
191    /// Returns the identity provider set either programmatically via
192    /// `set_identity_provider()` or from the runtime context during `initialize()`.
193    /// Programmatically-set providers take precedence over context providers.
194    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
195        self.identity_provider.read().await.clone()
196    }
197
198    /// Set the identity provider programmatically.
199    ///
200    /// This is typically called during reaction construction when the provider
201    /// is available from configuration (e.g., `with_identity_provider()` builder).
202    /// Providers set this way take precedence over context-injected providers.
203    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
204        *self.identity_provider.write().await = Some(provider);
205    }
206
207    /// Get whether this reaction should auto-start
208    pub fn get_auto_start(&self) -> bool {
209        self.auto_start
210    }
211
212    /// Clone the ReactionBase with shared Arc references
213    ///
214    /// This creates a new ReactionBase that shares the same underlying
215    /// data through Arc references. Useful for passing to spawned tasks.
216    pub fn clone_shared(&self) -> Self {
217        Self {
218            id: self.id.clone(),
219            queries: self.queries.clone(),
220            auto_start: self.auto_start,
221            status_handle: self.status_handle.clone(),
222            context: self.context.clone(),
223            state_store: self.state_store.clone(),
224            priority_queue: self.priority_queue.clone(),
225            subscription_tasks: self.subscription_tasks.clone(),
226            processing_task: self.processing_task.clone(),
227            shutdown_tx: self.shutdown_tx.clone(),
228            identity_provider: self.identity_provider.clone(),
229        }
230    }
231
232    /// Create a shutdown channel and store the sender
233    ///
234    /// Returns the receiver which should be passed to the processing task.
235    /// The sender is stored internally and will be triggered by `stop_common()`.
236    ///
237    /// This should be called before spawning the processing task.
238    pub async fn create_shutdown_channel(&self) -> tokio::sync::oneshot::Receiver<()> {
239        let (tx, rx) = tokio::sync::oneshot::channel();
240        *self.shutdown_tx.write().await = Some(tx);
241        rx
242    }
243
244    /// Get the reaction ID
245    pub fn get_id(&self) -> &str {
246        &self.id
247    }
248
249    /// Get the query IDs
250    pub fn get_queries(&self) -> &[String] {
251        &self.queries
252    }
253
254    /// Get current status.
255    pub async fn get_status(&self) -> ComponentStatus {
256        self.status_handle.get_status().await
257    }
258
259    /// Returns a clonable [`ComponentStatusHandle`] for use in spawned tasks.
260    ///
261    /// The handle can both read and write the component's status and automatically
262    /// notifies the graph on every status change (after `initialize()`).
263    pub fn status_handle(&self) -> ComponentStatusHandle {
264        self.status_handle.clone()
265    }
266
267    /// Set the component's status — updates local state AND notifies the graph.
268    ///
269    /// This is the single canonical way to change a reaction's status.
270    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
271        self.status_handle.set_status(status, message).await;
272    }
273
274    /// Enqueue a query result for processing.
275    ///
276    /// The host calls this to forward query results to the reaction's priority queue.
277    /// Results are processed in timestamp order by the reaction's processing task.
278    pub async fn enqueue_query_result(&self, result: QueryResult) -> anyhow::Result<()> {
279        self.priority_queue.enqueue_wait(Arc::new(result)).await;
280        Ok(())
281    }
282
283    /// Perform common cleanup operations
284    ///
285    /// This method handles:
286    /// 1. Sending shutdown signal to processing task (for graceful termination)
287    /// 2. Aborting all subscription forwarder tasks
288    /// 3. Waiting for or aborting the processing task
289    /// 4. Draining the priority queue
290    pub async fn stop_common(&self) -> Result<()> {
291        info!("Stopping reaction: {}", self.id);
292
293        // Send shutdown signal to processing task (if it's using tokio::select!)
294        if let Some(tx) = self.shutdown_tx.write().await.take() {
295            let _ = tx.send(());
296        }
297
298        // Abort all subscription forwarder tasks
299        let mut subscription_tasks = self.subscription_tasks.write().await;
300        for task in subscription_tasks.drain(..) {
301            task.abort();
302        }
303        drop(subscription_tasks);
304
305        // Wait for the processing task to complete (with timeout), or abort it
306        let mut processing_task = self.processing_task.write().await;
307        if let Some(mut task) = processing_task.take() {
308            // Give the task a short time to respond to the shutdown signal
309            match tokio::time::timeout(std::time::Duration::from_secs(2), &mut task).await {
310                Ok(Ok(())) => {
311                    debug!("[{}] Processing task completed gracefully", self.id);
312                }
313                Ok(Err(e)) => {
314                    // Task was aborted or panicked
315                    debug!("[{}] Processing task ended: {}", self.id, e);
316                }
317                Err(_) => {
318                    // Timeout - task didn't respond to shutdown signal
319                    warn!(
320                        "[{}] Processing task did not respond to shutdown signal within timeout, aborting",
321                        self.id
322                    );
323                    task.abort();
324                }
325            }
326        }
327        drop(processing_task);
328
329        // Drain the priority queue
330        let drained_events = self.priority_queue.drain().await;
331        if !drained_events.is_empty() {
332            info!(
333                "[{}] Drained {} pending events from priority queue",
334                self.id,
335                drained_events.len()
336            );
337        }
338
339        self.set_status(
340            ComponentStatus::Stopped,
341            Some(format!("Reaction '{}' stopped", self.id)),
342        )
343        .await;
344        info!("Reaction '{}' stopped", self.id);
345
346        Ok(())
347    }
348
349    /// Clear the reaction's state store partition.
350    ///
351    /// This is called during deprovision to remove all persisted state
352    /// associated with this reaction. Reactions that override `deprovision()`
353    /// can call this to clean up their state store.
354    pub async fn deprovision_common(&self) -> Result<()> {
355        info!("Deprovisioning reaction '{}'", self.id);
356        if let Some(store) = self.state_store().await {
357            let count = store.clear_store(&self.id).await.map_err(|e| {
358                anyhow::anyhow!(
359                    "Failed to clear state store for reaction '{}': {}",
360                    self.id,
361                    e
362                )
363            })?;
364            info!(
365                "Cleared {} keys from state store for reaction '{}'",
366                count, self.id
367            );
368        }
369        Ok(())
370    }
371
372    /// Set the processing task handle
373    pub async fn set_processing_task(&self, task: tokio::task::JoinHandle<()>) {
374        *self.processing_task.write().await = Some(task);
375    }
376}
377
378#[cfg(test)]
379mod tests {
380    use super::*;
381    use std::sync::atomic::{AtomicBool, Ordering};
382    use std::time::Duration;
383    use tokio::sync::mpsc;
384
385    #[tokio::test]
386    async fn test_reaction_base_creation() {
387        let params = ReactionBaseParams::new("test-reaction", vec!["query1".to_string()])
388            .with_priority_queue_capacity(5000);
389
390        let base = ReactionBase::new(params);
391        assert_eq!(base.id, "test-reaction");
392        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
393    }
394
395    #[tokio::test]
396    async fn test_status_transitions() {
397        use crate::context::ReactionRuntimeContext;
398
399        let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
400        let update_tx = graph.update_sender();
401        let graph = Arc::new(RwLock::new(graph));
402        let params = ReactionBaseParams::new("test-reaction", vec![]);
403
404        let base = ReactionBase::new(params);
405
406        // Create context and initialize
407        let context =
408            ReactionRuntimeContext::new("test-instance", "test-reaction", None, update_tx, None);
409        base.initialize(context).await;
410
411        // Test status transition
412        base.set_status(ComponentStatus::Starting, Some("Starting test".to_string()))
413            .await;
414
415        assert_eq!(base.get_status().await, ComponentStatus::Starting);
416
417        // Check event was sent via graph broadcast
418        let mut event_rx = graph.read().await.subscribe();
419        // The status was already set; emit another event to verify the graph path works
420        base.set_status(ComponentStatus::Running, Some("Running test".to_string()))
421            .await;
422
423        assert_eq!(base.get_status().await, ComponentStatus::Running);
424    }
425
426    #[tokio::test]
427    async fn test_priority_queue_operations() {
428        let params =
429            ReactionBaseParams::new("test-reaction", vec![]).with_priority_queue_capacity(10);
430
431        let base = ReactionBase::new(params);
432
433        // Create a test query result
434        let query_result = QueryResult::new(
435            "test-query".to_string(),
436            chrono::Utc::now(),
437            vec![],
438            Default::default(),
439        );
440
441        // Enqueue result
442        let enqueued = base.priority_queue.enqueue(Arc::new(query_result)).await;
443        assert!(enqueued);
444
445        // Drain queue
446        let drained = base.priority_queue.drain().await;
447        assert_eq!(drained.len(), 1);
448    }
449
450    #[tokio::test]
451    async fn test_event_without_initialization() {
452        // Test that set_status works even without context initialization
453        let params = ReactionBaseParams::new("test-reaction", vec![]);
454
455        let base = ReactionBase::new(params);
456
457        // This should succeed without panicking (silently updates local only when handle is None)
458        base.set_status(ComponentStatus::Starting, None).await;
459    }
460
461    // =============================================================================
462    // Shutdown Channel Tests
463    // =============================================================================
464
465    #[tokio::test]
466    async fn test_create_shutdown_channel() {
467        let params = ReactionBaseParams::new("test-reaction", vec![]);
468        let base = ReactionBase::new(params);
469
470        // Initially no shutdown_tx
471        assert!(base.shutdown_tx.read().await.is_none());
472
473        // Create channel
474        let rx = base.create_shutdown_channel().await;
475
476        // Verify tx is stored
477        assert!(base.shutdown_tx.read().await.is_some());
478
479        // Verify receiver is valid (dropping it should not panic)
480        drop(rx);
481    }
482
483    #[tokio::test]
484    async fn test_shutdown_channel_signal() {
485        let params = ReactionBaseParams::new("test-reaction", vec![]);
486        let base = ReactionBase::new(params);
487
488        let mut rx = base.create_shutdown_channel().await;
489
490        // Send signal
491        if let Some(tx) = base.shutdown_tx.write().await.take() {
492            tx.send(()).unwrap();
493        }
494
495        // Verify signal received
496        let result = rx.try_recv();
497        assert!(result.is_ok());
498    }
499
500    #[tokio::test]
501    async fn test_shutdown_channel_replaced_on_second_create() {
502        let params = ReactionBaseParams::new("test-reaction", vec![]);
503        let base = ReactionBase::new(params);
504
505        // Create first channel
506        let _rx1 = base.create_shutdown_channel().await;
507
508        // Create second channel (should replace the first)
509        let mut rx2 = base.create_shutdown_channel().await;
510
511        // Send signal - should go to second channel
512        if let Some(tx) = base.shutdown_tx.write().await.take() {
513            tx.send(()).unwrap();
514        }
515
516        // Second receiver should get the signal
517        let result = rx2.try_recv();
518        assert!(result.is_ok());
519    }
520
521    #[tokio::test]
522    async fn test_stop_common_sends_shutdown_signal() {
523        let params = ReactionBaseParams::new("test-reaction", vec![]);
524        let base = ReactionBase::new(params);
525
526        let mut rx = base.create_shutdown_channel().await;
527
528        // Spawn a task that waits for shutdown
529        let shutdown_received = Arc::new(AtomicBool::new(false));
530        let shutdown_flag = shutdown_received.clone();
531
532        let task = tokio::spawn(async move {
533            tokio::select! {
534                _ = &mut rx => {
535                    shutdown_flag.store(true, Ordering::SeqCst);
536                }
537            }
538        });
539
540        base.set_processing_task(task).await;
541
542        // Call stop_common - should send shutdown signal and await the task
543        let _ = base.stop_common().await;
544
545        // stop_common awaits the processing task, so the flag should already be set
546        assert!(
547            shutdown_received.load(Ordering::SeqCst),
548            "Processing task should have received shutdown signal"
549        );
550    }
551
552    #[tokio::test]
553    async fn test_graceful_shutdown_timing() {
554        let params = ReactionBaseParams::new("test-reaction", vec![]);
555        let base = ReactionBase::new(params);
556
557        let rx = base.create_shutdown_channel().await;
558
559        // Spawn task that uses select! pattern like real reactions
560        let task = tokio::spawn(async move {
561            let mut shutdown_rx = rx;
562            loop {
563                tokio::select! {
564                    biased;
565                    _ = &mut shutdown_rx => {
566                        break;
567                    }
568                    _ = tokio::time::sleep(Duration::from_secs(10)) => {
569                        // Simulates waiting on priority_queue.dequeue()
570                    }
571                }
572            }
573        });
574
575        base.set_processing_task(task).await;
576
577        // Measure shutdown time
578        let start = std::time::Instant::now();
579        let _ = base.stop_common().await;
580        let elapsed = start.elapsed();
581
582        // Should complete quickly (< 500ms), not hit 2s timeout
583        assert!(
584            elapsed < Duration::from_millis(500),
585            "Shutdown took {elapsed:?}, expected < 500ms. Task may not be responding to shutdown signal."
586        );
587    }
588
589    #[tokio::test]
590    async fn test_stop_common_without_shutdown_channel() {
591        // Test that stop_common works even if no shutdown channel was created
592        let params = ReactionBaseParams::new("test-reaction", vec![]);
593        let base = ReactionBase::new(params);
594
595        // Don't create shutdown channel - just spawn a short-lived task
596        let task = tokio::spawn(async {
597            tokio::time::sleep(Duration::from_millis(10)).await;
598        });
599
600        base.set_processing_task(task).await;
601
602        // stop_common should still work
603        let result = base.stop_common().await;
604        assert!(result.is_ok());
605    }
606
607    // =============================================================================
608    // Accessor Tests
609    // =============================================================================
610
611    #[tokio::test]
612    async fn test_get_id() {
613        let params = ReactionBaseParams::new("my-reaction-42", vec![]);
614        let base = ReactionBase::new(params);
615        assert_eq!(base.get_id(), "my-reaction-42");
616    }
617
618    #[tokio::test]
619    async fn test_get_queries() {
620        let queries = vec!["query-a".to_string(), "query-b".to_string(), "query-c".to_string()];
621        let params = ReactionBaseParams::new("r1", queries.clone());
622        let base = ReactionBase::new(params);
623        assert_eq!(base.get_queries(), &queries[..]);
624    }
625
626    #[tokio::test]
627    async fn test_get_queries_empty() {
628        let params = ReactionBaseParams::new("r1", vec![]);
629        let base = ReactionBase::new(params);
630        assert!(base.get_queries().is_empty());
631    }
632
633    #[tokio::test]
634    async fn test_get_auto_start_default_true() {
635        let params = ReactionBaseParams::new("r1", vec![]);
636        let base = ReactionBase::new(params);
637        assert!(base.get_auto_start());
638    }
639
640    #[tokio::test]
641    async fn test_get_auto_start_override_false() {
642        let params = ReactionBaseParams::new("r1", vec![]).with_auto_start(false);
643        let base = ReactionBase::new(params);
644        assert!(!base.get_auto_start());
645    }
646
647    // =============================================================================
648    // Context / State Store / Identity Provider Tests
649    // =============================================================================
650
651    #[tokio::test]
652    async fn test_context_none_before_initialize() {
653        let params = ReactionBaseParams::new("r1", vec![]);
654        let base = ReactionBase::new(params);
655        assert!(base.context().await.is_none());
656    }
657
658    #[tokio::test]
659    async fn test_context_some_after_initialize() {
660        let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
661        let update_tx = graph.update_sender();
662        let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
663
664        let params = ReactionBaseParams::new("r1", vec![]);
665        let base = ReactionBase::new(params);
666        base.initialize(context).await;
667
668        let ctx = base.context().await;
669        assert!(ctx.is_some());
670        assert_eq!(ctx.unwrap().reaction_id, "r1");
671    }
672
673    #[tokio::test]
674    async fn test_state_store_none_when_not_configured() {
675        let params = ReactionBaseParams::new("r1", vec![]);
676        let base = ReactionBase::new(params);
677        assert!(base.state_store().await.is_none());
678    }
679
680    #[tokio::test]
681    async fn test_state_store_none_after_initialize_without_store() {
682        let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
683        let update_tx = graph.update_sender();
684        let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
685
686        let params = ReactionBaseParams::new("r1", vec![]);
687        let base = ReactionBase::new(params);
688        base.initialize(context).await;
689
690        assert!(base.state_store().await.is_none());
691    }
692
693    #[tokio::test]
694    async fn test_identity_provider_none_by_default() {
695        let params = ReactionBaseParams::new("r1", vec![]);
696        let base = ReactionBase::new(params);
697        assert!(base.identity_provider().await.is_none());
698    }
699
700    // =============================================================================
701    // Status Handle Tests
702    // =============================================================================
703
704    #[tokio::test]
705    async fn test_status_handle_returns_handle() {
706        let params = ReactionBaseParams::new("r1", vec![]);
707        let base = ReactionBase::new(params);
708
709        let handle = base.status_handle();
710        // The handle should share the same status as the base
711        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
712
713        // Mutating via handle should be visible via base
714        handle.set_status(ComponentStatus::Running, None).await;
715        assert_eq!(base.get_status().await, ComponentStatus::Running);
716    }
717
718    // =============================================================================
719    // Deprovision Tests
720    // =============================================================================
721
722    #[tokio::test]
723    async fn test_deprovision_common_noop_without_state_store() {
724        let params = ReactionBaseParams::new("r1", vec![]);
725        let base = ReactionBase::new(params);
726        // Should succeed without panicking when no state store is configured
727        let result = base.deprovision_common().await;
728        assert!(result.is_ok());
729    }
730
731    // =============================================================================
732    // Processing Task Tests
733    // =============================================================================
734
735    #[tokio::test]
736    async fn test_set_processing_task_stores_handle() {
737        let params = ReactionBaseParams::new("r1", vec![]);
738        let base = ReactionBase::new(params);
739
740        // Initially no processing task
741        assert!(base.processing_task.read().await.is_none());
742
743        let task = tokio::spawn(async {
744            tokio::time::sleep(Duration::from_secs(60)).await;
745        });
746
747        base.set_processing_task(task).await;
748
749        // Now it should be stored
750        assert!(base.processing_task.read().await.is_some());
751
752        // Clean up: abort the long-running task
753        let task = base.processing_task.write().await.take();
754        if let Some(t) = task {
755            t.abort();
756        }
757    }
758}