drasi_lib/lib_core_ops/reaction_ops.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Reaction management operations for DrasiLib
16//!
17//! This module provides all reaction-related operations including adding, removing,
18//! starting, and stopping reactions.
19
20use crate::channels::ComponentStatus;
21use crate::component_ops::map_state_error;
22use crate::config::ReactionRuntime;
23use crate::error::{DrasiError, Result};
24use crate::lib_core::DrasiLib;
25use crate::reactions::Reaction;
26
27impl DrasiLib {
28 /// Add a reaction instance to a running server, taking ownership.
29 ///
30 /// The reaction instance is wrapped in an Arc internally - callers transfer
31 /// ownership rather than pre-wrapping in Arc.
32 ///
33 /// If the server is running and the reaction has `auto_start=true`, the reaction
34 /// will be started immediately after being added.
35 ///
36 /// # Example
37 /// ```no_run
38 /// # use drasi_lib::DrasiLib;
39 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
40 /// // Create the reaction and transfer ownership
41 /// // let reaction = MyReaction::new("my-reaction", vec!["query1".into()]);
42 /// // core.add_reaction(reaction).await?; // Ownership transferred, auto-started if server running
43 /// # Ok(())
44 /// # }
45 /// ```
46 pub async fn add_reaction(&self, reaction: impl Reaction + 'static) -> Result<()> {
47 self.state_guard.require_initialized().await?;
48
49 // Capture auto_start and id before transferring ownership
50 let should_auto_start = reaction.auto_start();
51 let reaction_id = reaction.id().to_string();
52
53 self.reaction_manager
54 .add_reaction(reaction)
55 .await
56 .map_err(|e| DrasiError::provisioning(format!("Failed to add reaction: {e}")))?;
57
58 // If server is running and reaction wants auto-start, start it
59 if self.is_running().await && should_auto_start {
60 self.reaction_manager
61 .start_reaction(reaction_id)
62 .await
63 .map_err(|e| DrasiError::provisioning(format!("Failed to start reaction: {e}")))?;
64 }
65
66 Ok(())
67 }
68
69 /// Remove a reaction from a running server
70 ///
71 /// If the reaction is running, it will be stopped first before removal.
72 ///
73 /// # Example
74 /// ```no_run
75 /// # use drasi_lib::DrasiLib;
76 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
77 /// core.remove_reaction("old-reaction").await?;
78 /// # Ok(())
79 /// # }
80 /// ```
81 pub async fn remove_reaction(&self, id: &str) -> Result<()> {
82 self.state_guard.require_initialized().await?;
83
84 // Stop if running
85 let status = self
86 .reaction_manager
87 .get_reaction_status(id.to_string())
88 .await
89 .map_err(|_| DrasiError::component_not_found("reaction", id))?;
90
91 if matches!(status, ComponentStatus::Running) {
92 self.reaction_manager
93 .stop_reaction(id.to_string())
94 .await
95 .map_err(|e| DrasiError::provisioning(format!("Failed to stop reaction: {e}")))?;
96 }
97
98 // Delete the reaction
99 self.reaction_manager
100 .delete_reaction(id.to_string())
101 .await
102 .map_err(|e| DrasiError::provisioning(format!("Failed to delete reaction: {e}")))?;
103
104 Ok(())
105 }
106
107 /// Start a stopped reaction
108 ///
109 /// This will create the necessary subscriptions to query result streams.
110 /// The QueryProvider was already injected when the reaction was added via `add_reaction()`.
111 ///
112 /// # Example
113 /// ```no_run
114 /// # use drasi_lib::DrasiLib;
115 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
116 /// core.start_reaction("my-reaction").await?;
117 /// # Ok(())
118 /// # }
119 /// ```
120 pub async fn start_reaction(&self, id: &str) -> Result<()> {
121 self.state_guard.require_initialized().await?;
122
123 // Start the reaction (QueryProvider was injected when reaction was added)
124 map_state_error(
125 self.reaction_manager.start_reaction(id.to_string()).await,
126 "reaction",
127 id,
128 )
129 }
130
131 /// Stop a running reaction
132 ///
133 /// # Example
134 /// ```no_run
135 /// # use drasi_lib::DrasiLib;
136 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
137 /// core.stop_reaction("my-reaction").await?;
138 /// # Ok(())
139 /// # }
140 /// ```
141 pub async fn stop_reaction(&self, id: &str) -> Result<()> {
142 self.state_guard.require_initialized().await?;
143
144 // Stop the reaction (subscriptions managed by reaction itself)
145 map_state_error(
146 self.reaction_manager.stop_reaction(id.to_string()).await,
147 "reaction",
148 id,
149 )?;
150
151 Ok(())
152 }
153
154 /// List all reactions with their current status
155 ///
156 /// # Example
157 /// ```no_run
158 /// # use drasi_lib::DrasiLib;
159 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
160 /// let reactions = core.list_reactions().await?;
161 /// for (id, status) in reactions {
162 /// println!("Reaction {}: {:?}", id, status);
163 /// }
164 /// # Ok(())
165 /// # }
166 /// ```
167 pub async fn list_reactions(&self) -> Result<Vec<(String, ComponentStatus)>> {
168 self.inspection.list_reactions().await
169 }
170
171 /// Get detailed information about a specific reaction
172 ///
173 /// # Example
174 /// ```no_run
175 /// # use drasi_lib::DrasiLib;
176 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
177 /// let reaction_info = core.get_reaction_info("my-reaction").await?;
178 /// println!("Reaction type: {}", reaction_info.reaction_type);
179 /// println!("Status: {:?}", reaction_info.status);
180 /// println!("Queries: {:?}", reaction_info.queries);
181 /// # Ok(())
182 /// # }
183 /// ```
184 pub async fn get_reaction_info(&self, id: &str) -> Result<ReactionRuntime> {
185 self.inspection.get_reaction_info(id).await
186 }
187
188 /// Get the current status of a specific reaction
189 ///
190 /// # Example
191 /// ```no_run
192 /// # use drasi_lib::DrasiLib;
193 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
194 /// let status = core.get_reaction_status("my-reaction").await?;
195 /// println!("Reaction status: {:?}", status);
196 /// # Ok(())
197 /// # }
198 /// ```
199 pub async fn get_reaction_status(&self, id: &str) -> Result<ComponentStatus> {
200 self.inspection.get_reaction_status(id).await
201 }
202}