drasi_lib/lib_core_ops/source_ops.rs
1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7// http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source management operations for DrasiLib
16//!
17//! This module provides all source-related operations including adding, removing,
18//! starting, and stopping sources.
19
20use crate::channels::ComponentStatus;
21use crate::component_ops::map_component_error;
22use crate::config::SourceRuntime;
23use crate::error::{DrasiError, Result};
24use crate::lib_core::DrasiLib;
25use crate::sources::Source;
26
27impl DrasiLib {
28 /// Add a source instance to a running server, taking ownership.
29 ///
30 /// The source instance is wrapped in an Arc internally - callers transfer
31 /// ownership rather than pre-wrapping in Arc.
32 ///
33 /// If the server is running and the source has `auto_start=true`, the source
34 /// will be started immediately after being added.
35 ///
36 /// # Example
37 /// ```no_run
38 /// # use drasi_lib::DrasiLib;
39 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
40 /// // Create the source and transfer ownership
41 /// // let source = MySource::new("new-source", config)?;
42 /// // core.add_source(source).await?; // Ownership transferred, auto-started if server running
43 /// # Ok(())
44 /// # }
45 /// ```
46 pub async fn add_source(&self, source: impl Source + 'static) -> Result<()> {
47 self.state_guard.require_initialized().await?;
48
49 // Capture auto_start and id before transferring ownership
50 let should_auto_start = source.auto_start();
51 let source_id = source.id().to_string();
52
53 self.source_manager
54 .add_source(source)
55 .await
56 .map_err(|e| DrasiError::provisioning(format!("Failed to add source: {e}")))?;
57
58 // If server is running and source wants auto-start, start it
59 if self.is_running().await && should_auto_start {
60 self.source_manager
61 .start_source(source_id)
62 .await
63 .map_err(|e| DrasiError::provisioning(format!("Failed to start source: {e}")))?;
64 }
65
66 Ok(())
67 }
68
69 /// Remove a source from a running server
70 ///
71 /// If the source is running, it will be stopped first before removal.
72 ///
73 /// # Example
74 /// ```no_run
75 /// # use drasi_lib::DrasiLib;
76 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
77 /// core.remove_source("old-source").await?;
78 /// # Ok(())
79 /// # }
80 /// ```
81 pub async fn remove_source(&self, id: &str) -> Result<()> {
82 self.state_guard.require_initialized().await?;
83
84 // Stop if running
85 let status = self
86 .source_manager
87 .get_source_status(id.to_string())
88 .await
89 .map_err(|_| DrasiError::component_not_found("source", id))?;
90
91 if matches!(status, ComponentStatus::Running) {
92 self.source_manager
93 .stop_source(id.to_string())
94 .await
95 .map_err(|e| DrasiError::provisioning(format!("Failed to stop source: {e}")))?;
96 }
97
98 // Delete the source
99 self.source_manager
100 .delete_source(id.to_string())
101 .await
102 .map_err(|e| DrasiError::provisioning(format!("Failed to delete source: {e}")))?;
103
104 Ok(())
105 }
106
107 /// Start a stopped source
108 ///
109 /// # Example
110 /// ```no_run
111 /// # use drasi_lib::DrasiLib;
112 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
113 /// core.start_source("my-source").await?;
114 /// # Ok(())
115 /// # }
116 /// ```
117 pub async fn start_source(&self, id: &str) -> Result<()> {
118 self.state_guard.require_initialized().await?;
119
120 map_component_error(
121 self.source_manager.start_source(id.to_string()).await,
122 "source",
123 id,
124 "start",
125 )
126 }
127
128 /// Stop a running source
129 ///
130 /// # Example
131 /// ```no_run
132 /// # use drasi_lib::DrasiLib;
133 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
134 /// core.stop_source("my-source").await?;
135 /// # Ok(())
136 /// # }
137 /// ```
138 pub async fn stop_source(&self, id: &str) -> Result<()> {
139 self.state_guard.require_initialized().await?;
140
141 map_component_error(
142 self.source_manager.stop_source(id.to_string()).await,
143 "source",
144 id,
145 "stop",
146 )
147 }
148
149 /// List all sources with their current status
150 ///
151 /// # Example
152 /// ```no_run
153 /// # use drasi_lib::DrasiLib;
154 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
155 /// let sources = core.list_sources().await?;
156 /// for (id, status) in sources {
157 /// println!("Source {}: {:?}", id, status);
158 /// }
159 /// # Ok(())
160 /// # }
161 /// ```
162 pub async fn list_sources(&self) -> Result<Vec<(String, ComponentStatus)>> {
163 self.inspection.list_sources().await
164 }
165
166 /// Get detailed information about a specific source
167 ///
168 /// # Example
169 /// ```no_run
170 /// # use drasi_lib::DrasiLib;
171 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
172 /// let source_info = core.get_source_info("my-source").await?;
173 /// println!("Source type: {}", source_info.source_type);
174 /// println!("Status: {:?}", source_info.status);
175 /// # Ok(())
176 /// # }
177 /// ```
178 pub async fn get_source_info(&self, id: &str) -> Result<SourceRuntime> {
179 self.inspection.get_source_info(id).await
180 }
181
182 /// Get the current status of a specific source
183 ///
184 /// # Example
185 /// ```no_run
186 /// # use drasi_lib::DrasiLib;
187 /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
188 /// let status = core.get_source_status("my-source").await?;
189 /// println!("Source status: {:?}", status);
190 /// # Ok(())
191 /// # }
192 /// ```
193 pub async fn get_source_status(&self, id: &str) -> Result<ComponentStatus> {
194 self.inspection.get_source_status(id).await
195 }
196}