Skip to main content

drasi_lib/lib_core_ops/
source_ops.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source management operations for DrasiLib
16//!
17//! This module provides all source-related operations including adding, removing,
18//! starting, and stopping sources.
19
20use crate::channels::ComponentStatus;
21use crate::component_ops::map_component_error;
22use crate::config::SourceRuntime;
23use crate::error::{DrasiError, Result};
24use crate::lib_core::DrasiLib;
25use crate::sources::Source;
26
27impl DrasiLib {
28    /// Add a source instance to a running server, taking ownership.
29    ///
30    /// The source instance is wrapped in an Arc internally - callers transfer
31    /// ownership rather than pre-wrapping in Arc.
32    ///
33    /// If the server is running and the source has `auto_start=true`, the source
34    /// will be started immediately after being added.
35    ///
36    /// # Example
37    /// ```no_run
38    /// # use drasi_lib::DrasiLib;
39    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
40    /// // Create the source and transfer ownership
41    /// // let source = MySource::new("new-source", config)?;
42    /// // core.add_source(source).await?;  // Ownership transferred, auto-started if server running
43    /// # Ok(())
44    /// # }
45    /// ```
46    pub async fn add_source(&self, source: impl Source + 'static) -> Result<()> {
47        self.state_guard.require_initialized().await?;
48
49        // Capture auto_start and id before transferring ownership
50        let should_auto_start = source.auto_start();
51        let source_id = source.id().to_string();
52
53        self.source_manager
54            .add_source(source)
55            .await
56            .map_err(|e| DrasiError::provisioning(format!("Failed to add source: {e}")))?;
57
58        // If server is running and source wants auto-start, start it
59        if self.is_running().await && should_auto_start {
60            self.source_manager
61                .start_source(source_id)
62                .await
63                .map_err(|e| DrasiError::provisioning(format!("Failed to start source: {e}")))?;
64        }
65
66        Ok(())
67    }
68
69    /// Remove a source from a running server
70    ///
71    /// If the source is running, it will be stopped first before removal.
72    ///
73    /// # Example
74    /// ```no_run
75    /// # use drasi_lib::DrasiLib;
76    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
77    /// core.remove_source("old-source").await?;
78    /// # Ok(())
79    /// # }
80    /// ```
81    pub async fn remove_source(&self, id: &str) -> Result<()> {
82        self.state_guard.require_initialized().await?;
83
84        // Stop if running
85        let status = self
86            .source_manager
87            .get_source_status(id.to_string())
88            .await
89            .map_err(|_| DrasiError::component_not_found("source", id))?;
90
91        if matches!(status, ComponentStatus::Running) {
92            self.source_manager
93                .stop_source(id.to_string())
94                .await
95                .map_err(|e| DrasiError::provisioning(format!("Failed to stop source: {e}")))?;
96        }
97
98        // Delete the source
99        self.source_manager
100            .delete_source(id.to_string())
101            .await
102            .map_err(|e| DrasiError::provisioning(format!("Failed to delete source: {e}")))?;
103
104        Ok(())
105    }
106
107    /// Start a stopped source
108    ///
109    /// # Example
110    /// ```no_run
111    /// # use drasi_lib::DrasiLib;
112    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
113    /// core.start_source("my-source").await?;
114    /// # Ok(())
115    /// # }
116    /// ```
117    pub async fn start_source(&self, id: &str) -> Result<()> {
118        self.state_guard.require_initialized().await?;
119
120        map_component_error(
121            self.source_manager.start_source(id.to_string()).await,
122            "source",
123            id,
124            "start",
125        )
126    }
127
128    /// Stop a running source
129    ///
130    /// # Example
131    /// ```no_run
132    /// # use drasi_lib::DrasiLib;
133    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
134    /// core.stop_source("my-source").await?;
135    /// # Ok(())
136    /// # }
137    /// ```
138    pub async fn stop_source(&self, id: &str) -> Result<()> {
139        self.state_guard.require_initialized().await?;
140
141        map_component_error(
142            self.source_manager.stop_source(id.to_string()).await,
143            "source",
144            id,
145            "stop",
146        )
147    }
148
149    /// List all sources with their current status
150    ///
151    /// # Example
152    /// ```no_run
153    /// # use drasi_lib::DrasiLib;
154    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
155    /// let sources = core.list_sources().await?;
156    /// for (id, status) in sources {
157    ///     println!("Source {}: {:?}", id, status);
158    /// }
159    /// # Ok(())
160    /// # }
161    /// ```
162    pub async fn list_sources(&self) -> Result<Vec<(String, ComponentStatus)>> {
163        self.inspection.list_sources().await
164    }
165
166    /// Get detailed information about a specific source
167    ///
168    /// # Example
169    /// ```no_run
170    /// # use drasi_lib::DrasiLib;
171    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
172    /// let source_info = core.get_source_info("my-source").await?;
173    /// println!("Source type: {}", source_info.source_type);
174    /// println!("Status: {:?}", source_info.status);
175    /// # Ok(())
176    /// # }
177    /// ```
178    pub async fn get_source_info(&self, id: &str) -> Result<SourceRuntime> {
179        self.inspection.get_source_info(id).await
180    }
181
182    /// Get the current status of a specific source
183    ///
184    /// # Example
185    /// ```no_run
186    /// # use drasi_lib::DrasiLib;
187    /// # async fn example(core: &DrasiLib) -> Result<(), Box<dyn std::error::Error>> {
188    /// let status = core.get_source_status("my-source").await?;
189    /// println!("Source status: {:?}", status);
190    /// # Ok(())
191    /// # }
192    /// ```
193    pub async fn get_source_status(&self, id: &str) -> Result<ComponentStatus> {
194        self.inspection.get_source_status(id).await
195    }
196}