Skip to main content

drasi_lib/sources/
traits.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source trait module
16//!
17//! This module provides the core trait that all source plugins must implement.
18//! It separates the plugin contract from the source manager and implementation details.
19//!
20//! # Plugin Architecture
21//!
22//! Each source plugin:
23//! 1. Defines its own typed configuration struct with builder pattern
24//! 2. Creates a `SourceBase` instance using `SourceBaseParams`
25//! 3. Implements the `Source` trait
26//! 4. Is passed to `DrasiLib` via `add_source()` which takes ownership
27//!
28//! drasi-lib has no knowledge of which plugins exist - it only knows about this trait.
29//!
30//! # Runtime Context Initialization
31//!
32//! Sources receive all drasi-lib services through a single `initialize()` call
33//! when added to DrasiLib. The `SourceRuntimeContext` provides:
34//! - `event_tx`: Channel for component lifecycle events
35//! - `state_store`: Optional persistent state storage
36//!
37//! This replaces the previous `inject_*` methods with a cleaner single-call pattern.
38
39use anyhow::Result;
40use async_trait::async_trait;
41
42use crate::channels::*;
43use crate::context::SourceRuntimeContext;
44
45/// Trait defining the interface for all source implementations.
46///
47/// This is the core abstraction that all source plugins must implement.
48/// drasi-lib only interacts with sources through this trait - it has no
49/// knowledge of specific plugin types or their configurations.
50///
51/// # Example Implementation
52///
53/// ```ignore
54/// use drasi_lib::Source;
55/// use drasi_lib::sources::{SourceBase, SourceBaseParams};
56/// use drasi_lib::context::SourceRuntimeContext;
57///
58/// pub struct MySource {
59///     base: SourceBase,
60///     // Plugin-specific fields
61/// }
62///
63/// impl MySource {
64///     pub fn new(config: MySourceConfig) -> Result<Self> {
65///         let params = SourceBaseParams::new(&config.id)
66///             .with_dispatch_mode(config.dispatch_mode)
67///             .with_dispatch_buffer_capacity(config.buffer_capacity);
68///
69///         Ok(Self {
70///             base: SourceBase::new(params)?,
71///         })
72///     }
73/// }
74///
75/// #[async_trait]
76/// impl Source for MySource {
77///     fn id(&self) -> &str {
78///         &self.base.id
79///     }
80///
81///     fn type_name(&self) -> &str {
82///         "my-source"
83///     }
84///
85///     fn properties(&self) -> HashMap<String, Value> {
86///         // Return plugin-specific properties
87///     }
88///
89///     async fn initialize(&self, context: SourceRuntimeContext) {
90///         self.base.initialize(context).await;
91///     }
92///
93///     // ... implement other methods
94/// }
95/// ```
96#[async_trait]
97pub trait Source: Send + Sync {
98    /// Get the source's unique identifier
99    fn id(&self) -> &str;
100
101    /// Get the source type name (e.g., "postgres", "http", "mock")
102    fn type_name(&self) -> &str;
103
104    /// Get the source's configuration properties for inspection
105    ///
106    /// This returns a HashMap representation of the source's configuration
107    /// for use in APIs and inspection. The actual typed configuration is
108    /// owned by the plugin - this is just for external visibility.
109    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value>;
110
111    /// Get the dispatch mode for this source (Channel or Broadcast)
112    ///
113    /// Default is Channel mode for backpressure support.
114    fn dispatch_mode(&self) -> DispatchMode {
115        DispatchMode::Channel
116    }
117
118    /// Whether this source should auto-start when DrasiLib starts
119    ///
120    /// Default is `true`. Override to return `false` if this source
121    /// should only be started manually via `start_source()`.
122    fn auto_start(&self) -> bool {
123        true
124    }
125
126    /// Start the source
127    ///
128    /// This begins data ingestion and event generation.
129    async fn start(&self) -> Result<()>;
130
131    /// Stop the source
132    ///
133    /// This stops data ingestion and cleans up resources.
134    async fn stop(&self) -> Result<()>;
135
136    /// Get the current status of the source
137    async fn status(&self) -> ComponentStatus;
138
139    /// Subscribe to this source for change events
140    ///
141    /// This is called by queries to receive data changes from this source.
142    /// The source should return a receiver for streaming events and optionally
143    /// a bootstrap receiver for initial data.
144    ///
145    /// # Arguments
146    /// * `settings` - Subscription settings including query ID, text, and labels of interest
147    ///
148    /// # Returns
149    /// A SubscriptionResponse containing:
150    /// * A receiver for streaming source events
151    /// * Optionally a bootstrap receiver for initial data
152    async fn subscribe(
153        &self,
154        settings: crate::config::SourceSubscriptionSettings,
155    ) -> Result<SubscriptionResponse>;
156
157    /// Downcast helper for testing - allows access to concrete types
158    fn as_any(&self) -> &dyn std::any::Any;
159
160    /// Permanently clean up internal state when the source is being removed.
161    ///
162    /// This is called when `remove_source(id, cleanup: true)` is used.
163    /// Use this to release external resources that should not persist after
164    /// the source is deleted (e.g., drop a replication slot, remove cursors).
165    ///
166    /// The default implementation is a no-op. Override only if your source
167    /// manages external state that needs explicit teardown.
168    ///
169    /// Errors are logged but do not prevent the source from being removed.
170    async fn deprovision(&self) -> Result<()> {
171        Ok(())
172    }
173
174    /// Initialize the source with runtime context.
175    ///
176    /// This method is called automatically by DrasiLib when the source is added
177    /// via `add_source()`. Plugin developers do not need to call this directly.
178    ///
179    /// The context provides access to:
180    /// - `source_id`: The source's unique identifier
181    /// - `event_tx`: Channel for reporting component lifecycle events
182    /// - `state_store`: Optional persistent state storage
183    ///
184    /// Implementation should delegate to `self.base.initialize(context).await`.
185    async fn initialize(&self, context: SourceRuntimeContext);
186
187    /// Set the bootstrap provider for this source
188    ///
189    /// This method allows setting a bootstrap provider after source construction.
190    /// It is optional - sources without a bootstrap provider will report that
191    /// bootstrap is not available.
192    ///
193    /// Implementation should delegate to `self.base.set_bootstrap_provider(provider).await`.
194    async fn set_bootstrap_provider(
195        &self,
196        _provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
197    ) {
198        // Default implementation does nothing - sources that support bootstrap
199        // should override this to delegate to their SourceBase
200    }
201}
202
203/// Blanket implementation of Source for `Box<dyn Source>`
204///
205/// This allows boxed trait objects to be used with methods expecting `impl Source`.
206#[async_trait]
207impl Source for Box<dyn Source + 'static> {
208    fn id(&self) -> &str {
209        (**self).id()
210    }
211
212    fn type_name(&self) -> &str {
213        (**self).type_name()
214    }
215
216    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
217        (**self).properties()
218    }
219
220    fn dispatch_mode(&self) -> DispatchMode {
221        (**self).dispatch_mode()
222    }
223
224    fn auto_start(&self) -> bool {
225        (**self).auto_start()
226    }
227
228    async fn start(&self) -> Result<()> {
229        (**self).start().await
230    }
231
232    async fn stop(&self) -> Result<()> {
233        (**self).stop().await
234    }
235
236    async fn status(&self) -> ComponentStatus {
237        (**self).status().await
238    }
239
240    async fn subscribe(
241        &self,
242        settings: crate::config::SourceSubscriptionSettings,
243    ) -> Result<SubscriptionResponse> {
244        (**self).subscribe(settings).await
245    }
246
247    fn as_any(&self) -> &dyn std::any::Any {
248        (**self).as_any()
249    }
250
251    async fn deprovision(&self) -> Result<()> {
252        (**self).deprovision().await
253    }
254
255    async fn initialize(&self, context: SourceRuntimeContext) {
256        (**self).initialize(context).await
257    }
258
259    async fn set_bootstrap_provider(
260        &self,
261        provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
262    ) {
263        (**self).set_bootstrap_provider(provider).await
264    }
265}