Skip to main content

drasi_lib/lib_core_ops/
reaction_ops.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Reaction management operations for DrasiLib
16//!
17//! This module provides all reaction-related operations including adding, removing,
18//! starting, and stopping reactions.
19
20use crate::channels::ComponentStatus;
21use crate::component_ops::map_state_error;
22use crate::config::ReactionRuntime;
23use crate::error::{DrasiError, Result};
24use crate::lib_core::DrasiLib;
25use crate::reactions::Reaction;
26
27impl DrasiLib {
28    /// Add a reaction instance to a running server, taking ownership.
29    ///
30    /// The reaction instance is wrapped in an Arc internally - callers transfer
31    /// ownership rather than pre-wrapping in Arc.
32    ///
33    /// If the server is running and the reaction has `auto_start=true`, the reaction
34    /// will be started immediately after being added.
35    ///
36    /// # Example
37    /// ```no_run
38    /// # use drasi_lib::DrasiLib;
39    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
40    /// // Create the reaction and transfer ownership
41    /// // let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
42    /// // core.add_reaction(reaction).await?;  // Ownership transferred, auto-started if server running
43    /// # Ok(())
44    /// # }
45    /// ```
46    pub async fn add_reaction(&self, reaction: impl Reaction + 'static) -> Result<()> {
47        self.state_guard.require_initialized().await?;
48
49        // Capture auto_start and id before transferring ownership
50        let should_auto_start = reaction.auto_start();
51        let reaction_id = reaction.id().to_string();
52
53        self.reaction_manager
54            .add_reaction(reaction)
55            .await
56            .map_err(|e| DrasiError::provisioning(format!("Failed to add reaction: {e}")))?;
57
58        // If server is running and reaction wants auto-start, start it
59        if self.is_running().await && should_auto_start {
60            self.reaction_manager
61                .start_reaction(reaction_id)
62                .await
63                .map_err(|e| DrasiError::provisioning(format!("Failed to start reaction: {e}")))?;
64        }
65
66        Ok(())
67    }
68
69    /// Remove a reaction from a running server
70    ///
71    /// If the reaction is running, it will be stopped first before removal.
72    ///
73    /// # Example
74    /// ```no_run
75    /// # use drasi_lib::DrasiLib;
76    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
77    /// core.remove_reaction("old-reaction").await?;
78    /// # Ok(())
79    /// # }
80    /// ```
81    pub async fn remove_reaction(&self, id: &str) -> Result<()> {
82        self.state_guard.require_initialized().await?;
83
84        // Stop if running
85        let status = self
86            .reaction_manager
87            .get_reaction_status(id.to_string())
88            .await
89            .map_err(|_| DrasiError::component_not_found("reaction", id))?;
90
91        if matches!(status, ComponentStatus::Running) {
92            self.reaction_manager
93                .stop_reaction(id.to_string())
94                .await
95                .map_err(|e| DrasiError::provisioning(format!("Failed to stop reaction: {e}")))?;
96        }
97
98        // Delete the reaction
99        self.reaction_manager
100            .delete_reaction(id.to_string())
101            .await
102            .map_err(|e| DrasiError::provisioning(format!("Failed to delete reaction: {e}")))?;
103
104        Ok(())
105    }
106
107    /// Start a stopped reaction
108    ///
109    /// This will create the necessary subscriptions to query result streams.
110    /// The QueryProvider was already injected when the reaction was added via `add_reaction()`.
111    ///
112    /// # Example
113    /// ```no_run
114    /// # use drasi_lib::DrasiLib;
115    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
116    /// core.start_reaction("my-reaction").await?;
117    /// # Ok(())
118    /// # }
119    /// ```
120    pub async fn start_reaction(&self, id: &str) -> Result<()> {
121        self.state_guard.require_initialized().await?;
122
123        // Start the reaction (QueryProvider was injected when reaction was added)
124        map_state_error(
125            self.reaction_manager.start_reaction(id.to_string()).await,
126            "reaction",
127            id,
128        )
129    }
130
131    /// Stop a running reaction
132    ///
133    /// # Example
134    /// ```no_run
135    /// # use drasi_lib::DrasiLib;
136    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
137    /// core.stop_reaction("my-reaction").await?;
138    /// # Ok(())
139    /// # }
140    /// ```
141    pub async fn stop_reaction(&self, id: &str) -> Result<()> {
142        self.state_guard.require_initialized().await?;
143
144        // Stop the reaction (subscriptions managed by reaction itself)
145        map_state_error(
146            self.reaction_manager.stop_reaction(id.to_string()).await,
147            "reaction",
148            id,
149        )?;
150
151        Ok(())
152    }
153
154    /// List all reactions with their current status
155    ///
156    /// # Example
157    /// ```no_run
158    /// # use drasi_lib::DrasiLib;
159    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
160    /// let reactions = core.list_reactions().await?;
161    /// for (id, status) in reactions {
162    ///     println!("Reaction {}: {:?}", id, status);
163    /// }
164    /// # Ok(())
165    /// # }
166    /// ```
167    pub async fn list_reactions(&self) -> Result<Vec<(String, ComponentStatus)>> {
168        self.inspection.list_reactions().await
169    }
170
171    /// Get detailed information about a specific reaction
172    ///
173    /// # Example
174    /// ```no_run
175    /// # use drasi_lib::DrasiLib;
176    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
177    /// let reaction_info = core.get_reaction_info("my-reaction").await?;
178    /// println!("Reaction type: {}", reaction_info.reaction_type);
179    /// println!("Status: {:?}", reaction_info.status);
180    /// println!("Queries: {:?}", reaction_info.queries);
181    /// # Ok(())
182    /// # }
183    /// ```
184    pub async fn get_reaction_info(&self, id: &str) -> Result<ReactionRuntime> {
185        self.inspection.get_reaction_info(id).await
186    }
187
188    /// Get the current status of a specific reaction
189    ///
190    /// # Example
191    /// ```no_run
192    /// # use drasi_lib::DrasiLib;
193    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
194    /// let status = core.get_reaction_status("my-reaction").await?;
195    /// println!("Reaction status: {:?}", status);
196    /// # Ok(())
197    /// # }
198    /// ```
199    pub async fn get_reaction_status(&self, id: &str) -> Result<ComponentStatus> {
200        self.inspection.get_reaction_status(id).await
201    }
202}