Skip to main content

drasi_lib/reactions/common/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common reaction functionality.
16//!
17//! This module provides `ReactionBase` which encapsulates common patterns
18//! used across all reaction implementations:
19//! - Query subscription management
20//! - Priority queue handling
21//! - Task lifecycle management
22//! - Component status tracking
23//! - Event reporting
24//!
25//! # Plugin Architecture
26//!
27//! ReactionBase is designed to be used by reaction plugins. Each plugin:
28//! 1. Defines its own typed configuration struct
29//! 2. Creates a ReactionBase with ReactionBaseParams
30//! 3. Implements the Reaction trait delegating to ReactionBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::sync::Arc;
35use tokio::sync::RwLock;
36use tracing::Instrument;
37
38use crate::channels::priority_queue::PriorityQueue;
39use crate::channels::{ComponentStatus, QueryResult};
40use crate::component_graph::ComponentStatusHandle;
41use crate::context::ReactionRuntimeContext;
42use crate::identity::IdentityProvider;
43use crate::reactions::checkpoint::ReactionCheckpoint;
44use crate::recovery::ReactionRecoveryPolicy;
45use crate::state_store::StateStoreProvider;
46
47/// Parameters for creating a ReactionBase instance.
48///
49/// This struct contains only the information that ReactionBase needs to function.
50/// Plugin-specific configuration should remain in the plugin crate.
51///
52/// # Example
53///
54/// ```ignore
55/// use drasi_lib::reactions::common::base::{ReactionBase, ReactionBaseParams};
56///
57/// let params = ReactionBaseParams::new("my-reaction", vec!["query1".to_string()])
58///     .with_priority_queue_capacity(5000)
59///     .with_auto_start(true);
60///
61/// let base = ReactionBase::new(params);
62/// ```
63#[derive(Debug, Clone)]
64pub struct ReactionBaseParams {
65    /// Unique identifier for the reaction
66    pub id: String,
67    /// List of query IDs this reaction subscribes to
68    pub queries: Vec<String>,
69    /// Priority queue capacity - defaults to 10000
70    pub priority_queue_capacity: Option<usize>,
71    /// Whether this reaction should auto-start - defaults to true
72    pub auto_start: bool,
73    /// Optional recovery policy override (takes precedence over the trait default).
74    pub recovery_policy: Option<ReactionRecoveryPolicy>,
75}
76
77impl ReactionBaseParams {
78    /// Create new params with ID and queries, using defaults for everything else
79    pub fn new(id: impl Into<String>, queries: Vec<String>) -> Self {
80        Self {
81            id: id.into(),
82            queries,
83            priority_queue_capacity: None,
84            auto_start: true, // Default to true like queries
85            recovery_policy: None,
86        }
87    }
88
89    /// Set the priority queue capacity
90    pub fn with_priority_queue_capacity(mut self, capacity: usize) -> Self {
91        self.priority_queue_capacity = Some(capacity);
92        self
93    }
94
95    /// Set whether this reaction should auto-start
96    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
97        self.auto_start = auto_start;
98        self
99    }
100
101    /// Set the recovery policy override for this reaction instance.
102    pub fn with_recovery_policy(mut self, policy: ReactionRecoveryPolicy) -> Self {
103        self.recovery_policy = Some(policy);
104        self
105    }
106}
107
108/// Base implementation for common reaction functionality
109pub struct ReactionBase {
110    /// Reaction identifier
111    pub id: String,
112    /// List of query IDs to subscribe to
113    pub queries: Vec<String>,
114    /// Whether this reaction should auto-start
115    pub auto_start: bool,
116    /// Optional recovery policy override
117    pub recovery_policy: Option<ReactionRecoveryPolicy>,
118    /// Component status handle — always available, wired to graph during initialize().
119    status_handle: ComponentStatusHandle,
120    /// Runtime context (set by initialize())
121    context: Arc<RwLock<Option<ReactionRuntimeContext>>>,
122    /// State store provider (extracted from context for convenience)
123    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
124    /// Priority queue for timestamp-ordered result processing
125    pub priority_queue: PriorityQueue<QueryResult>,
126    /// Handles to subscription forwarder tasks
127    pub subscription_tasks: Arc<RwLock<Vec<tokio::task::JoinHandle<()>>>>,
128    /// Handle to the main processing task
129    pub processing_task: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
130    /// Sender for shutdown signal to processing task
131    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
132    /// Optional identity provider for credential management.
133    /// Set either programmatically (via `set_identity_provider`) or automatically
134    /// from the runtime context during `initialize()`.
135    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
136    /// Original raw config JSON from the descriptor, preserving ConfigValue
137    /// envelopes (secrets, env vars) for lossless persistence roundtrips.
138    raw_config: Option<serde_json::Value>,
139}
140
141impl ReactionBase {
142    /// Create a new ReactionBase with the given parameters
143    ///
144    /// Dependencies (query subscriber, state store, graph) are not required during
145    /// construction - they will be provided via `initialize()` when the reaction is added to DrasiLib.
146    pub fn new(params: ReactionBaseParams) -> Self {
147        Self {
148            priority_queue: PriorityQueue::new(params.priority_queue_capacity.unwrap_or(10000)),
149            id: params.id.clone(),
150            queries: params.queries,
151            auto_start: params.auto_start,
152            recovery_policy: params.recovery_policy,
153            status_handle: ComponentStatusHandle::new(&params.id),
154            context: Arc::new(RwLock::new(None)), // Set by initialize()
155            state_store: Arc::new(RwLock::new(None)), // Extracted from context
156            subscription_tasks: Arc::new(RwLock::new(Vec::new())),
157            processing_task: Arc::new(RwLock::new(None)),
158            shutdown_tx: Arc::new(RwLock::new(None)),
159            identity_provider: Arc::new(RwLock::new(None)),
160            raw_config: None,
161        }
162    }
163
164    /// Initialize the reaction with runtime context.
165    ///
166    /// This method is called automatically by DrasiLib's `add_reaction()` method.
167    /// Plugin developers do not need to call this directly.
168    ///
169    /// The context provides access to:
170    /// - `reaction_id`: The reaction's unique identifier
171    /// - `state_store`: Optional persistent state storage
172    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the graph
173    pub async fn initialize(&self, context: ReactionRuntimeContext) {
174        // Store context for later use
175        *self.context.write().await = Some(context.clone());
176
177        // Wire the status handle to the graph update channel
178        self.status_handle.wire(context.update_tx.clone()).await;
179
180        if let Some(state_store) = context.state_store.as_ref() {
181            *self.state_store.write().await = Some(state_store.clone());
182        }
183
184        // Store identity provider from context if not already set programmatically
185        if let Some(ip) = context.identity_provider.as_ref() {
186            let mut guard = self.identity_provider.write().await;
187            if guard.is_none() {
188                *guard = Some(ip.clone());
189            }
190        }
191    }
192
193    /// Get the runtime context if initialized.
194    ///
195    /// Returns `None` if `initialize()` has not been called yet.
196    pub async fn context(&self) -> Option<ReactionRuntimeContext> {
197        self.context.read().await.clone()
198    }
199
200    /// Get the state store if configured.
201    ///
202    /// Returns `None` if no state store was provided in the context.
203    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
204        self.state_store.read().await.clone()
205    }
206
207    /// Get the identity provider if set.
208    ///
209    /// Returns the identity provider set either programmatically via
210    /// `set_identity_provider()` or from the runtime context during `initialize()`.
211    /// Programmatically-set providers take precedence over context providers.
212    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
213        self.identity_provider.read().await.clone()
214    }
215
216    /// Set the identity provider programmatically.
217    ///
218    /// This is typically called during reaction construction when the provider
219    /// is available from configuration (e.g., `with_identity_provider()` builder).
220    /// Providers set this way take precedence over context-injected providers.
221    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
222        *self.identity_provider.write().await = Some(provider);
223    }
224
225    /// Get whether this reaction should auto-start
226    pub fn get_auto_start(&self) -> bool {
227        self.auto_start
228    }
229
230    /// Set the original raw config JSON for lossless persistence roundtrips.
231    pub fn set_raw_config(&mut self, config: serde_json::Value) {
232        self.raw_config = Some(config);
233    }
234
235    /// Get the original raw config JSON, if set by a descriptor.
236    pub fn raw_config(&self) -> Option<&serde_json::Value> {
237        self.raw_config.as_ref()
238    }
239
240    /// Build the properties map for this reaction.
241    ///
242    /// If `raw_config` was set (descriptor path), returns its top-level keys.
243    /// Otherwise, serializes `fallback_dto` (the DTO reconstructed from typed
244    /// config) to produce camelCase output.
245    ///
246    /// This eliminates the duplicated if-let + serialize pattern from plugins.
247    pub fn properties_or_serialize<D: serde::Serialize>(
248        &self,
249        fallback_dto: &D,
250    ) -> std::collections::HashMap<String, serde_json::Value> {
251        if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
252            return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
253        }
254
255        match serde_json::to_value(fallback_dto) {
256            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
257            _ => std::collections::HashMap::new(),
258        }
259    }
260
261    /// Clone the ReactionBase with shared Arc references
262    ///
263    /// This creates a new ReactionBase that shares the same underlying
264    /// data through Arc references. Useful for passing to spawned tasks.
265    pub fn clone_shared(&self) -> Self {
266        Self {
267            id: self.id.clone(),
268            queries: self.queries.clone(),
269            auto_start: self.auto_start,
270            recovery_policy: self.recovery_policy,
271            status_handle: self.status_handle.clone(),
272            context: self.context.clone(),
273            state_store: self.state_store.clone(),
274            priority_queue: self.priority_queue.clone(),
275            subscription_tasks: self.subscription_tasks.clone(),
276            processing_task: self.processing_task.clone(),
277            shutdown_tx: self.shutdown_tx.clone(),
278            identity_provider: self.identity_provider.clone(),
279            raw_config: self.raw_config.clone(),
280        }
281    }
282
283    /// Create a shutdown channel and store the sender
284    ///
285    /// Returns the receiver which should be passed to the processing task.
286    /// The sender is stored internally and will be triggered by `stop_common()`.
287    ///
288    /// This should be called before spawning the processing task.
289    pub async fn create_shutdown_channel(&self) -> tokio::sync::oneshot::Receiver<()> {
290        let (tx, rx) = tokio::sync::oneshot::channel();
291        *self.shutdown_tx.write().await = Some(tx);
292        rx
293    }
294
295    /// Get the reaction ID
296    pub fn get_id(&self) -> &str {
297        &self.id
298    }
299
300    /// Get the query IDs
301    pub fn get_queries(&self) -> &[String] {
302        &self.queries
303    }
304
305    /// Get current status.
306    pub async fn get_status(&self) -> ComponentStatus {
307        self.status_handle.get_status().await
308    }
309
310    /// Returns a cloneable [`ComponentStatusHandle`] for use in spawned tasks.
311    ///
312    /// The handle can both read and write the component's status and automatically
313    /// notifies the graph on every status change (after `initialize()`).
314    pub fn status_handle(&self) -> ComponentStatusHandle {
315        self.status_handle.clone()
316    }
317
318    /// Set the component's status — updates local state AND notifies the graph.
319    ///
320    /// This is the single canonical way to change a reaction's status.
321    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
322        self.status_handle.set_status(status, message).await;
323    }
324
325    /// Enqueue a query result for processing.
326    ///
327    /// The host calls this to forward query results to the reaction's priority queue.
328    /// Results are processed in timestamp order by the reaction's processing task.
329    pub async fn enqueue_query_result(&self, result: QueryResult) -> anyhow::Result<()> {
330        self.priority_queue.enqueue_wait(Arc::new(result)).await;
331        Ok(())
332    }
333
334    // ========================================================================
335    // Checkpoint helpers
336    // ========================================================================
337
338    /// Read the persisted checkpoint for a single query subscription.
339    ///
340    /// Returns `Ok(None)` if no checkpoint exists (fresh start).
341    /// Errors propagate from the state store or deserialization.
342    pub async fn read_checkpoint(&self, query_id: &str) -> Result<Option<ReactionCheckpoint>> {
343        let store = self.state_store.read().await;
344        match store.as_ref() {
345            Some(s) => {
346                crate::reactions::checkpoint::read_checkpoint(s.as_ref(), &self.id, query_id).await
347            }
348            None => Ok(None),
349        }
350    }
351
352    /// Read all persisted checkpoints for every query this reaction subscribes to.
353    ///
354    /// Returns a map from query ID to checkpoint. Missing checkpoints (fresh
355    /// subscriptions) are simply omitted from the result.
356    pub async fn read_all_checkpoints(
357        &self,
358    ) -> Result<std::collections::HashMap<String, ReactionCheckpoint>> {
359        let store = self.state_store.read().await;
360        match store.as_ref() {
361            Some(s) => {
362                crate::reactions::checkpoint::read_checkpoints_batch(
363                    s.as_ref(),
364                    &self.id,
365                    &self.queries,
366                )
367                .await
368            }
369            None => Ok(std::collections::HashMap::new()),
370        }
371    }
372
373    /// Persist a checkpoint for a single query subscription.
374    ///
375    /// This atomically writes the checkpoint to the state store. It should be
376    /// called after a batch of query results has been successfully processed.
377    pub async fn write_checkpoint(
378        &self,
379        query_id: &str,
380        checkpoint: &ReactionCheckpoint,
381    ) -> Result<()> {
382        let store = self.state_store.read().await;
383        let store = store.as_ref().ok_or_else(|| {
384            anyhow::anyhow!("No state store configured — cannot write checkpoint")
385        })?;
386        crate::reactions::checkpoint::write_checkpoint(
387            store.as_ref(),
388            &self.id,
389            query_id,
390            checkpoint,
391        )
392        .await
393    }
394
395    /// Perform common cleanup operations
396    ///
397    /// This method handles:
398    /// 1. Sending shutdown signal to processing task (for graceful termination)
399    /// 2. Aborting all subscription forwarder tasks
400    /// 3. Waiting for or aborting the processing task
401    /// 4. Draining the priority queue
402    pub async fn stop_common(&self) -> Result<()> {
403        info!("Stopping reaction: {}", self.id);
404
405        // Send shutdown signal to processing task (if it's using tokio::select!)
406        if let Some(tx) = self.shutdown_tx.write().await.take() {
407            let _ = tx.send(());
408        }
409
410        // Abort all subscription forwarder tasks
411        let mut subscription_tasks = self.subscription_tasks.write().await;
412        for task in subscription_tasks.drain(..) {
413            task.abort();
414        }
415        drop(subscription_tasks);
416
417        // Wait for the processing task to complete (with timeout), or abort it
418        let mut processing_task = self.processing_task.write().await;
419        if let Some(mut task) = processing_task.take() {
420            // Give the task a short time to respond to the shutdown signal
421            match tokio::time::timeout(std::time::Duration::from_secs(2), &mut task).await {
422                Ok(Ok(())) => {
423                    debug!("[{}] Processing task completed gracefully", self.id);
424                }
425                Ok(Err(e)) => {
426                    // Task was aborted or panicked
427                    debug!("[{}] Processing task ended: {}", self.id, e);
428                }
429                Err(_) => {
430                    // Timeout - task didn't respond to shutdown signal
431                    warn!(
432                        "[{}] Processing task did not respond to shutdown signal within timeout, aborting",
433                        self.id
434                    );
435                    task.abort();
436                }
437            }
438        }
439        drop(processing_task);
440
441        // Drain the priority queue
442        let drained_events = self.priority_queue.drain().await;
443        if !drained_events.is_empty() {
444            info!(
445                "[{}] Drained {} pending events from priority queue",
446                self.id,
447                drained_events.len()
448            );
449        }
450
451        self.set_status(
452            ComponentStatus::Stopped,
453            Some(format!("Reaction '{}' stopped", self.id)),
454        )
455        .await;
456        info!("Reaction '{}' stopped", self.id);
457
458        Ok(())
459    }
460
461    /// Clear the reaction's state store partition.
462    ///
463    /// This is called during deprovision to remove all persisted state
464    /// associated with this reaction. Reactions that override `deprovision()`
465    /// can call this to clean up their state store.
466    pub async fn deprovision_common(&self) -> Result<()> {
467        info!("Deprovisioning reaction '{}'", self.id);
468        if let Some(store) = self.state_store().await {
469            let count = store.clear_store(&self.id).await.map_err(|e| {
470                anyhow::anyhow!(
471                    "Failed to clear state store for reaction '{}': {}",
472                    self.id,
473                    e
474                )
475            })?;
476            info!(
477                "Cleared {} keys from state store for reaction '{}'",
478                count, self.id
479            );
480        }
481        Ok(())
482    }
483
484    /// Set the processing task handle
485    pub async fn set_processing_task(&self, task: tokio::task::JoinHandle<()>) {
486        *self.processing_task.write().await = Some(task);
487    }
488
489    /// Run a standard dequeue → dedup → handler → checkpoint loop.
490    ///
491    /// This is an optional convenience for reactions that follow the common
492    /// pattern.  Reactions needing custom scheduling, batching, or
493    /// multi-query ordering should implement their own loop.
494    ///
495    /// The loop:
496    /// 1. Dequeues from the priority queue (blocks until available).
497    /// 2. Checks the event's sequence against the persisted checkpoint —
498    ///    events at or before the checkpoint are silently skipped (dedup).
499    /// 3. Calls `handler` with the event.
500    /// 4. On success, writes a new checkpoint with the event's sequence,
501    ///    preserving the `config_hash` from the initial checkpoint map
502    ///    (or 0 if no prior checkpoint exists for that query).
503    /// 5. Breaks when `shutdown_rx` fires.
504    ///
505    /// # Arguments
506    /// * `shutdown_rx` — receiver created via [`create_shutdown_channel`].
507    /// * `initial_checkpoints` — pre-loaded checkpoint map (from bootstrap
508    ///   orchestration). The loop uses these for dedup and preserves each
509    ///   query's `config_hash` when advancing the sequence.
510    /// * `handler` — async function receiving a [`QueryResult`].  Return
511    ///   `Ok(())` to advance the checkpoint, or `Err` to leave it unchanged
512    ///   (the event will NOT be retried automatically).
513    pub async fn run_standard_loop<F, Fut>(
514        &self,
515        mut shutdown_rx: tokio::sync::oneshot::Receiver<()>,
516        initial_checkpoints: std::collections::HashMap<String, ReactionCheckpoint>,
517        handler: F,
518    ) -> Result<()>
519    where
520        F: Fn(Arc<crate::channels::QueryResult>) -> Fut + Send + Sync,
521        Fut: std::future::Future<Output = Result<()>> + Send,
522    {
523        let mut checkpoints = initial_checkpoints;
524
525        loop {
526            let event = tokio::select! {
527                biased;
528                _ = &mut shutdown_rx => {
529                    break;
530                }
531                event = self.priority_queue.dequeue() => event,
532            };
533
534            let query_id = &event.query_id;
535            let seq = event.sequence;
536
537            // Dedup: skip events at or before the checkpoint.
538            if let Some(cp) = checkpoints.get(query_id) {
539                if seq <= cp.sequence {
540                    debug!(
541                        "[{}] Skipping already-processed event: query={}, seq={} (checkpoint={})",
542                        self.id, query_id, seq, cp.sequence
543                    );
544                    continue;
545                }
546            }
547
548            // Invoke the user-provided handler.
549            if let Err(e) = handler(Arc::clone(&event)).await {
550                error!(
551                    "[{}] Handler error for query={}, seq={}: {:#}",
552                    self.id, query_id, seq, e
553                );
554                // Don't advance the checkpoint — the event was not
555                // successfully processed.
556                continue;
557            }
558
559            // Advance the checkpoint, preserving the config_hash from bootstrap.
560            let config_hash = checkpoints
561                .get(query_id)
562                .map(|cp| cp.config_hash)
563                .unwrap_or(0);
564            let cp = ReactionCheckpoint {
565                sequence: seq,
566                config_hash,
567            };
568            self.write_checkpoint(query_id, &cp).await?;
569            checkpoints.insert(query_id.clone(), cp);
570        }
571
572        Ok(())
573    }
574}
575
576#[cfg(test)]
577mod tests {
578    use super::*;
579    use std::sync::atomic::{AtomicBool, Ordering};
580    use std::time::Duration;
581    use tokio::sync::mpsc;
582
583    #[tokio::test]
584    async fn test_reaction_base_creation() {
585        let params = ReactionBaseParams::new("test-reaction", vec!["query1".to_string()])
586            .with_priority_queue_capacity(5000);
587
588        let base = ReactionBase::new(params);
589        assert_eq!(base.id, "test-reaction");
590        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
591    }
592
593    #[tokio::test]
594    async fn test_status_transitions() {
595        use crate::context::ReactionRuntimeContext;
596
597        let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
598        let update_tx = graph.update_sender();
599        let graph = Arc::new(RwLock::new(graph));
600        let params = ReactionBaseParams::new("test-reaction", vec![]);
601
602        let base = ReactionBase::new(params);
603
604        // Create context and initialize
605        let context =
606            ReactionRuntimeContext::new("test-instance", "test-reaction", None, update_tx, None);
607        base.initialize(context).await;
608
609        // Test status transition
610        base.set_status(ComponentStatus::Starting, Some("Starting test".to_string()))
611            .await;
612
613        assert_eq!(base.get_status().await, ComponentStatus::Starting);
614
615        // Check event was sent via graph broadcast
616        let mut event_rx = graph.read().await.subscribe();
617        // The status was already set; emit another event to verify the graph path works
618        base.set_status(ComponentStatus::Running, Some("Running test".to_string()))
619            .await;
620
621        assert_eq!(base.get_status().await, ComponentStatus::Running);
622    }
623
624    #[tokio::test]
625    async fn test_priority_queue_operations() {
626        let params =
627            ReactionBaseParams::new("test-reaction", vec![]).with_priority_queue_capacity(10);
628
629        let base = ReactionBase::new(params);
630
631        // Create a test query result
632        let query_result = QueryResult::new(
633            "test-query".to_string(),
634            0,
635            chrono::Utc::now(),
636            vec![],
637            Default::default(),
638        );
639
640        // Enqueue result
641        let enqueued = base.priority_queue.enqueue(Arc::new(query_result)).await;
642        assert!(enqueued);
643
644        // Drain queue
645        let drained = base.priority_queue.drain().await;
646        assert_eq!(drained.len(), 1);
647    }
648
649    #[tokio::test]
650    async fn test_event_without_initialization() {
651        // Test that set_status works even without context initialization
652        let params = ReactionBaseParams::new("test-reaction", vec![]);
653
654        let base = ReactionBase::new(params);
655
656        // This should succeed without panicking (silently updates local only when handle is None)
657        base.set_status(ComponentStatus::Starting, None).await;
658    }
659
660    // =============================================================================
661    // Shutdown Channel Tests
662    // =============================================================================
663
664    #[tokio::test]
665    async fn test_create_shutdown_channel() {
666        let params = ReactionBaseParams::new("test-reaction", vec![]);
667        let base = ReactionBase::new(params);
668
669        // Initially no shutdown_tx
670        assert!(base.shutdown_tx.read().await.is_none());
671
672        // Create channel
673        let rx = base.create_shutdown_channel().await;
674
675        // Verify tx is stored
676        assert!(base.shutdown_tx.read().await.is_some());
677
678        // Verify receiver is valid (dropping it should not panic)
679        drop(rx);
680    }
681
682    #[tokio::test]
683    async fn test_shutdown_channel_signal() {
684        let params = ReactionBaseParams::new("test-reaction", vec![]);
685        let base = ReactionBase::new(params);
686
687        let mut rx = base.create_shutdown_channel().await;
688
689        // Send signal
690        if let Some(tx) = base.shutdown_tx.write().await.take() {
691            tx.send(()).unwrap();
692        }
693
694        // Verify signal received
695        let result = rx.try_recv();
696        assert!(result.is_ok());
697    }
698
699    #[tokio::test]
700    async fn test_shutdown_channel_replaced_on_second_create() {
701        let params = ReactionBaseParams::new("test-reaction", vec![]);
702        let base = ReactionBase::new(params);
703
704        // Create first channel
705        let _rx1 = base.create_shutdown_channel().await;
706
707        // Create second channel (should replace the first)
708        let mut rx2 = base.create_shutdown_channel().await;
709
710        // Send signal - should go to second channel
711        if let Some(tx) = base.shutdown_tx.write().await.take() {
712            tx.send(()).unwrap();
713        }
714
715        // Second receiver should get the signal
716        let result = rx2.try_recv();
717        assert!(result.is_ok());
718    }
719
720    #[tokio::test]
721    async fn test_stop_common_sends_shutdown_signal() {
722        let params = ReactionBaseParams::new("test-reaction", vec![]);
723        let base = ReactionBase::new(params);
724
725        let mut rx = base.create_shutdown_channel().await;
726
727        // Spawn a task that waits for shutdown
728        let shutdown_received = Arc::new(AtomicBool::new(false));
729        let shutdown_flag = shutdown_received.clone();
730
731        let task = tokio::spawn(async move {
732            tokio::select! {
733                _ = &mut rx => {
734                    shutdown_flag.store(true, Ordering::SeqCst);
735                }
736            }
737        });
738
739        base.set_processing_task(task).await;
740
741        // Call stop_common - should send shutdown signal and await the task
742        let _ = base.stop_common().await;
743
744        // stop_common awaits the processing task, so the flag should already be set
745        assert!(
746            shutdown_received.load(Ordering::SeqCst),
747            "Processing task should have received shutdown signal"
748        );
749    }
750
751    #[tokio::test]
752    async fn test_graceful_shutdown_timing() {
753        let params = ReactionBaseParams::new("test-reaction", vec![]);
754        let base = ReactionBase::new(params);
755
756        let rx = base.create_shutdown_channel().await;
757
758        // Spawn task that uses select! pattern like real reactions
759        let task = tokio::spawn(async move {
760            let mut shutdown_rx = rx;
761            loop {
762                tokio::select! {
763                    biased;
764                    _ = &mut shutdown_rx => {
765                        break;
766                    }
767                    _ = tokio::time::sleep(Duration::from_secs(10)) => {
768                        // Simulates waiting on priority_queue.dequeue()
769                    }
770                }
771            }
772        });
773
774        base.set_processing_task(task).await;
775
776        // Measure shutdown time
777        let start = std::time::Instant::now();
778        let _ = base.stop_common().await;
779        let elapsed = start.elapsed();
780
781        // Should complete quickly (< 500ms), not hit 2s timeout
782        assert!(
783            elapsed < Duration::from_millis(500),
784            "Shutdown took {elapsed:?}, expected < 500ms. Task may not be responding to shutdown signal."
785        );
786    }
787
788    #[tokio::test]
789    async fn test_stop_common_without_shutdown_channel() {
790        // Test that stop_common works even if no shutdown channel was created
791        let params = ReactionBaseParams::new("test-reaction", vec![]);
792        let base = ReactionBase::new(params);
793
794        // Don't create shutdown channel - just spawn a short-lived task
795        let task = tokio::spawn(async {
796            tokio::time::sleep(Duration::from_millis(10)).await;
797        });
798
799        base.set_processing_task(task).await;
800
801        // stop_common should still work
802        let result = base.stop_common().await;
803        assert!(result.is_ok());
804    }
805
806    // =============================================================================
807    // Accessor Tests
808    // =============================================================================
809
810    #[tokio::test]
811    async fn test_get_id() {
812        let params = ReactionBaseParams::new("my-reaction-42", vec![]);
813        let base = ReactionBase::new(params);
814        assert_eq!(base.get_id(), "my-reaction-42");
815    }
816
817    #[tokio::test]
818    async fn test_get_queries() {
819        let queries = vec!["query-a".to_string(), "query-b".to_string(), "query-c".to_string()];
820        let params = ReactionBaseParams::new("r1", queries.clone());
821        let base = ReactionBase::new(params);
822        assert_eq!(base.get_queries(), &queries[..]);
823    }
824
825    #[tokio::test]
826    async fn test_get_queries_empty() {
827        let params = ReactionBaseParams::new("r1", vec![]);
828        let base = ReactionBase::new(params);
829        assert!(base.get_queries().is_empty());
830    }
831
832    #[tokio::test]
833    async fn test_get_auto_start_default_true() {
834        let params = ReactionBaseParams::new("r1", vec![]);
835        let base = ReactionBase::new(params);
836        assert!(base.get_auto_start());
837    }
838
839    #[tokio::test]
840    async fn test_get_auto_start_override_false() {
841        let params = ReactionBaseParams::new("r1", vec![]).with_auto_start(false);
842        let base = ReactionBase::new(params);
843        assert!(!base.get_auto_start());
844    }
845
846    // =============================================================================
847    // Context / State Store / Identity Provider Tests
848    // =============================================================================
849
850    #[tokio::test]
851    async fn test_context_none_before_initialize() {
852        let params = ReactionBaseParams::new("r1", vec![]);
853        let base = ReactionBase::new(params);
854        assert!(base.context().await.is_none());
855    }
856
857    #[tokio::test]
858    async fn test_context_some_after_initialize() {
859        let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
860        let update_tx = graph.update_sender();
861        let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
862
863        let params = ReactionBaseParams::new("r1", vec![]);
864        let base = ReactionBase::new(params);
865        base.initialize(context).await;
866
867        let ctx = base.context().await;
868        assert!(ctx.is_some());
869        assert_eq!(ctx.unwrap().reaction_id, "r1");
870    }
871
872    #[tokio::test]
873    async fn test_state_store_none_when_not_configured() {
874        let params = ReactionBaseParams::new("r1", vec![]);
875        let base = ReactionBase::new(params);
876        assert!(base.state_store().await.is_none());
877    }
878
879    #[tokio::test]
880    async fn test_state_store_none_after_initialize_without_store() {
881        let (graph, _rx) = crate::component_graph::ComponentGraph::new("inst");
882        let update_tx = graph.update_sender();
883        let context = ReactionRuntimeContext::new("inst", "r1", None, update_tx, None);
884
885        let params = ReactionBaseParams::new("r1", vec![]);
886        let base = ReactionBase::new(params);
887        base.initialize(context).await;
888
889        assert!(base.state_store().await.is_none());
890    }
891
892    #[tokio::test]
893    async fn test_identity_provider_none_by_default() {
894        let params = ReactionBaseParams::new("r1", vec![]);
895        let base = ReactionBase::new(params);
896        assert!(base.identity_provider().await.is_none());
897    }
898
899    // =============================================================================
900    // Status Handle Tests
901    // =============================================================================
902
903    #[tokio::test]
904    async fn test_status_handle_returns_handle() {
905        let params = ReactionBaseParams::new("r1", vec![]);
906        let base = ReactionBase::new(params);
907
908        let handle = base.status_handle();
909        // The handle should share the same status as the base
910        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
911
912        // Mutating via handle should be visible via base
913        handle.set_status(ComponentStatus::Running, None).await;
914        assert_eq!(base.get_status().await, ComponentStatus::Running);
915    }
916
917    // =============================================================================
918    // Deprovision Tests
919    // =============================================================================
920
921    #[tokio::test]
922    async fn test_deprovision_common_noop_without_state_store() {
923        let params = ReactionBaseParams::new("r1", vec![]);
924        let base = ReactionBase::new(params);
925        // Should succeed without panicking when no state store is configured
926        let result = base.deprovision_common().await;
927        assert!(result.is_ok());
928    }
929
930    // =============================================================================
931    // Processing Task Tests
932    // =============================================================================
933
934    #[tokio::test]
935    async fn test_set_processing_task_stores_handle() {
936        let params = ReactionBaseParams::new("r1", vec![]);
937        let base = ReactionBase::new(params);
938
939        // Initially no processing task
940        assert!(base.processing_task.read().await.is_none());
941
942        let task = tokio::spawn(async {
943            tokio::time::sleep(Duration::from_secs(60)).await;
944        });
945
946        base.set_processing_task(task).await;
947
948        // Now it should be stored
949        assert!(base.processing_task.read().await.is_some());
950
951        // Clean up: abort the long-running task
952        let task = base.processing_task.write().await.take();
953        if let Some(t) = task {
954            t.abort();
955        }
956    }
957
958    #[tokio::test]
959    async fn test_checkpoint_read_write_round_trip() {
960        let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
961        let update_tx = graph.update_sender();
962
963        let params =
964            ReactionBaseParams::new("ckpt-reaction", vec!["q1".to_string(), "q2".to_string()]);
965        let base = ReactionBase::new(params);
966
967        // Wire up an in-memory state store via context
968        let store: Arc<dyn StateStoreProvider> =
969            Arc::new(crate::state_store::MemoryStateStoreProvider::new());
970        let context = crate::context::ReactionRuntimeContext::new(
971            "test-instance",
972            "ckpt-reaction",
973            Some(store),
974            update_tx,
975            None,
976        );
977        base.initialize(context).await;
978
979        // Initially no checkpoints
980        assert!(base.read_checkpoint("q1").await.unwrap().is_none());
981        assert!(base.read_all_checkpoints().await.unwrap().is_empty());
982
983        // Write a checkpoint for q1
984        let cp1 = ReactionCheckpoint {
985            sequence: 10,
986            config_hash: 42,
987        };
988        base.write_checkpoint("q1", &cp1).await.unwrap();
989
990        // Read it back
991        let read = base.read_checkpoint("q1").await.unwrap().unwrap();
992        assert_eq!(read, cp1);
993
994        // q2 still absent
995        assert!(base.read_checkpoint("q2").await.unwrap().is_none());
996
997        // Write q2 and check read_all_checkpoints
998        let cp2 = ReactionCheckpoint {
999            sequence: 20,
1000            config_hash: 99,
1001        };
1002        base.write_checkpoint("q2", &cp2).await.unwrap();
1003
1004        let all = base.read_all_checkpoints().await.unwrap();
1005        assert_eq!(all.len(), 2);
1006        assert_eq!(all["q1"], cp1);
1007        assert_eq!(all["q2"], cp2);
1008    }
1009
1010    #[tokio::test]
1011    async fn test_checkpoint_no_state_store() {
1012        // Without a state store, reads return None and writes fail
1013        let params = ReactionBaseParams::new("no-store", vec!["q1".to_string()]);
1014        let base = ReactionBase::new(params);
1015
1016        // read_checkpoint returns None without error
1017        assert!(base.read_checkpoint("q1").await.unwrap().is_none());
1018
1019        // read_all_checkpoints returns empty
1020        assert!(base.read_all_checkpoints().await.unwrap().is_empty());
1021
1022        // write_checkpoint should error
1023        let cp = ReactionCheckpoint {
1024            sequence: 1,
1025            config_hash: 0,
1026        };
1027        assert!(base.write_checkpoint("q1", &cp).await.is_err());
1028    }
1029
1030    #[tokio::test]
1031    async fn test_run_standard_loop_dedup_and_checkpoint() {
1032        use std::sync::atomic::{AtomicU64, Ordering};
1033
1034        let (graph, _rx) = crate::component_graph::ComponentGraph::new("test-instance");
1035        let update_tx = graph.update_sender();
1036
1037        let params = ReactionBaseParams::new("loop-reaction", vec!["q1".to_string()]);
1038        let base = ReactionBase::new(params);
1039
1040        let store: Arc<dyn StateStoreProvider> =
1041            Arc::new(crate::state_store::MemoryStateStoreProvider::new());
1042        let context = crate::context::ReactionRuntimeContext::new(
1043            "test-instance",
1044            "loop-reaction",
1045            Some(store),
1046            update_tx,
1047            None,
1048        );
1049        base.initialize(context).await;
1050
1051        // Initial checkpoints: seq=5 with config_hash=42
1052        let initial_checkpoints = {
1053            let mut m = std::collections::HashMap::new();
1054            m.insert(
1055                "q1".to_string(),
1056                ReactionCheckpoint {
1057                    sequence: 5,
1058                    config_hash: 42,
1059                },
1060            );
1061            m
1062        };
1063
1064        // Enqueue events: seq 3 (dup), seq 5 (dup), seq 6, seq 7
1065        for seq in [3u64, 5, 6, 7] {
1066            let result = crate::channels::QueryResult {
1067                query_id: "q1".to_string(),
1068                sequence: seq,
1069                timestamp: chrono::Utc::now(),
1070                results: vec![],
1071                metadata: Default::default(),
1072                profiling: None,
1073            };
1074            base.enqueue_query_result(result).await.unwrap();
1075        }
1076
1077        // Track which sequences the handler actually processes
1078        let processed = Arc::new(tokio::sync::Mutex::new(Vec::<u64>::new()));
1079        let processed_clone = processed.clone();
1080        let handler_count = Arc::new(AtomicU64::new(0));
1081        let handler_count_clone = handler_count.clone();
1082
1083        let shutdown_rx = base.create_shutdown_channel().await;
1084        let base_clone = base.clone_shared();
1085
1086        let loop_handle = tokio::spawn(async move {
1087            base_clone
1088                .run_standard_loop(shutdown_rx, initial_checkpoints, |event| {
1089                    let processed = processed_clone.clone();
1090                    let count = handler_count_clone.clone();
1091                    async move {
1092                        processed.lock().await.push(event.sequence);
1093                        count.fetch_add(1, Ordering::SeqCst);
1094                        Ok(())
1095                    }
1096                })
1097                .await
1098                .unwrap();
1099        });
1100
1101        // Wait for all non-dup events to be processed (seq 6 and 7)
1102        for _ in 0..50 {
1103            if handler_count.load(Ordering::SeqCst) >= 2 {
1104                break;
1105            }
1106            tokio::time::sleep(Duration::from_millis(50)).await;
1107        }
1108
1109        // Signal shutdown via stop_common (the standard path)
1110        let _ = base.stop_common().await;
1111        let _ = tokio::time::timeout(Duration::from_secs(2), loop_handle).await;
1112
1113        // Verify only seq 6 and 7 were processed (3 and 5 were deduped)
1114        let processed = processed.lock().await;
1115        assert_eq!(*processed, vec![6, 7]);
1116
1117        // Checkpoint should now be at seq=7, preserving config_hash=42
1118        let cp = base.read_checkpoint("q1").await.unwrap().unwrap();
1119        assert_eq!(cp.sequence, 7);
1120        assert_eq!(cp.config_hash, 42);
1121    }
1122}