Skip to main content

drasi_lib/sources/
traits.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source trait module
16//!
17//! This module provides the core trait that all source plugins must implement.
18//! It separates the plugin contract from the source manager and implementation details.
19//!
20//! # Plugin Architecture
21//!
22//! Each source plugin:
23//! 1. Defines its own typed configuration struct with builder pattern
24//! 2. Creates a `SourceBase` instance using `SourceBaseParams`
25//! 3. Implements the `Source` trait
26//! 4. Is passed to `DrasiLib` via `add_source()` which takes ownership
27//!
28//! drasi-lib has no knowledge of which plugins exist - it only knows about this trait.
29//!
30//! # Runtime Context Initialization
31//!
32//! Sources receive all drasi-lib services through a single `initialize()` call
33//! when added to DrasiLib. The `SourceRuntimeContext` provides:
34//! - `event_tx`: Channel for component lifecycle events
35//! - `state_store`: Optional persistent state storage
36//!
37//! This replaces the previous `inject_*` methods with a cleaner single-call pattern.
38
39use anyhow::Result;
40use async_trait::async_trait;
41use thiserror::Error;
42
43use crate::channels::*;
44use crate::context::SourceRuntimeContext;
45
46/// Structured error type for source operations.
47///
48/// Sources return these errors (via `anyhow::Error`) from trait methods like `subscribe()`.
49/// The orchestration layer can downcast to check for specific error variants:
50/// ```ignore
51/// if let Some(source_err) = err.downcast_ref::<SourceError>() {
52///     match source_err {
53///         SourceError::PositionUnavailable { .. } => { /* handle */ }
54///     }
55/// }
56/// ```
57#[derive(Error, Debug)]
58pub enum SourceError {
59    /// The requested resume position is no longer available.
60    /// The caller should consult its recovery_policy to decide next steps.
61    #[error("Source '{source_id}' cannot resume from position {requested}: position unavailable (earliest available: {earliest_available:?})")]
62    PositionUnavailable {
63        source_id: String,
64        requested: u64,
65        earliest_available: Option<u64>,
66    },
67}
68
69/// Trait defining the interface for all source implementations.
70///
71/// This is the core abstraction that all source plugins must implement.
72/// drasi-lib only interacts with sources through this trait - it has no
73/// knowledge of specific plugin types or their configurations.
74///
75/// # Example Implementation
76///
77/// ```ignore
78/// use drasi_lib::Source;
79/// use drasi_lib::sources::{SourceBase, SourceBaseParams};
80/// use drasi_lib::context::SourceRuntimeContext;
81///
82/// pub struct MySource {
83///     base: SourceBase,
84///     // Plugin-specific fields
85/// }
86///
87/// impl MySource {
88///     pub fn new(config: MySourceConfig) -> Result<Self> {
89///         let params = SourceBaseParams::new(&config.id)
90///             .with_dispatch_mode(config.dispatch_mode)
91///             .with_dispatch_buffer_capacity(config.buffer_capacity);
92///
93///         Ok(Self {
94///             base: SourceBase::new(params)?,
95///         })
96///     }
97/// }
98///
99/// #[async_trait]
100/// impl Source for MySource {
101///     fn id(&self) -> &str {
102///         &self.base.id
103///     }
104///
105///     fn type_name(&self) -> &str {
106///         "my-source"
107///     }
108///
109///     fn properties(&self) -> HashMap<String, Value> {
110///         // Return plugin-specific properties
111///     }
112///
113///     async fn initialize(&self, context: SourceRuntimeContext) {
114///         self.base.initialize(context).await;
115///     }
116///
117///     // ... implement other methods
118/// }
119/// ```
120#[async_trait]
121pub trait Source: Send + Sync {
122    /// Get the source's unique identifier
123    fn id(&self) -> &str;
124
125    /// Get the source type name (e.g., "postgres", "http", "mock")
126    fn type_name(&self) -> &str;
127
128    /// Get the source's configuration properties for inspection
129    ///
130    /// This returns a HashMap representation of the source's configuration
131    /// for use in APIs and inspection. The actual typed configuration is
132    /// owned by the plugin - this is just for external visibility.
133    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value>;
134
135    /// Get the dispatch mode for this source (Channel or Broadcast)
136    ///
137    /// Default is Channel mode for backpressure support.
138    fn dispatch_mode(&self) -> DispatchMode {
139        DispatchMode::Channel
140    }
141
142    /// Whether this source should auto-start when DrasiLib starts
143    ///
144    /// Default is `true`. Override to return `false` if this source
145    /// should only be started manually via `start_source()`.
146    fn auto_start(&self) -> bool {
147        true
148    }
149
150    /// Whether this source supports positional replay via `resume_from`.
151    ///
152    /// Sources backed by a persistent log (e.g., Postgres WAL, Kafka) should
153    /// override this to return `true`. The orchestration layer uses this to
154    /// validate compatibility with persistent queries and to request position handles.
155    fn supports_replay(&self) -> bool {
156        false
157    }
158
159    /// Start the source
160    ///
161    /// This begins data ingestion and event generation.
162    async fn start(&self) -> Result<()>;
163
164    /// Stop the source
165    ///
166    /// This stops data ingestion and cleans up resources.
167    async fn stop(&self) -> Result<()>;
168
169    /// Get the current status of the source
170    async fn status(&self) -> ComponentStatus;
171
172    /// Subscribe to this source for change events
173    ///
174    /// This is called by queries to receive data changes from this source.
175    /// The source should return a receiver for streaming events and optionally
176    /// a bootstrap receiver for initial data.
177    ///
178    /// # Arguments
179    /// * `settings` - Subscription settings including query ID, text, and labels of interest
180    ///
181    /// # Returns
182    /// A SubscriptionResponse containing:
183    /// * A receiver for streaming source events
184    /// * Optionally a bootstrap receiver for initial data
185    async fn subscribe(
186        &self,
187        settings: crate::config::SourceSubscriptionSettings,
188    ) -> Result<SubscriptionResponse>;
189
190    /// Downcast helper for testing - allows access to concrete types
191    fn as_any(&self) -> &dyn std::any::Any;
192
193    /// Permanently clean up internal state when the source is being removed.
194    ///
195    /// This is called when `remove_source(id, cleanup: true)` is used.
196    /// Use this to release external resources that should not persist after
197    /// the source is deleted (e.g., drop a replication slot, remove cursors).
198    ///
199    /// The default implementation is a no-op. Override only if your source
200    /// manages external state that needs explicit teardown.
201    ///
202    /// Errors are logged but do not prevent the source from being removed.
203    async fn deprovision(&self) -> Result<()> {
204        Ok(())
205    }
206
207    /// Initialize the source with runtime context.
208    ///
209    /// This method is called automatically by DrasiLib when the source is added
210    /// via `add_source()`. Plugin developers do not need to call this directly.
211    ///
212    /// The context provides access to:
213    /// - `source_id`: The source's unique identifier
214    /// - `event_tx`: Channel for reporting component lifecycle events
215    /// - `state_store`: Optional persistent state storage
216    ///
217    /// Implementation should delegate to `self.base.initialize(context).await`.
218    async fn initialize(&self, context: SourceRuntimeContext);
219
220    /// Set the bootstrap provider for this source
221    ///
222    /// This method allows setting a bootstrap provider after source construction.
223    /// It is optional - sources without a bootstrap provider will report that
224    /// bootstrap is not available.
225    ///
226    /// Implementation should delegate to `self.base.set_bootstrap_provider(provider).await`.
227    async fn set_bootstrap_provider(
228        &self,
229        _provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
230    ) {
231        // Default implementation does nothing - sources that support bootstrap
232        // should override this to delegate to their SourceBase
233    }
234}
235
236/// Blanket implementation of Source for `Box<dyn Source>`
237///
238/// This allows boxed trait objects to be used with methods expecting `impl Source`.
239#[async_trait]
240impl Source for Box<dyn Source + 'static> {
241    fn id(&self) -> &str {
242        (**self).id()
243    }
244
245    fn type_name(&self) -> &str {
246        (**self).type_name()
247    }
248
249    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
250        (**self).properties()
251    }
252
253    fn dispatch_mode(&self) -> DispatchMode {
254        (**self).dispatch_mode()
255    }
256
257    fn auto_start(&self) -> bool {
258        (**self).auto_start()
259    }
260
261    fn supports_replay(&self) -> bool {
262        (**self).supports_replay()
263    }
264
265    async fn start(&self) -> Result<()> {
266        (**self).start().await
267    }
268
269    async fn stop(&self) -> Result<()> {
270        (**self).stop().await
271    }
272
273    async fn status(&self) -> ComponentStatus {
274        (**self).status().await
275    }
276
277    async fn subscribe(
278        &self,
279        settings: crate::config::SourceSubscriptionSettings,
280    ) -> Result<SubscriptionResponse> {
281        (**self).subscribe(settings).await
282    }
283
284    fn as_any(&self) -> &dyn std::any::Any {
285        (**self).as_any()
286    }
287
288    async fn deprovision(&self) -> Result<()> {
289        (**self).deprovision().await
290    }
291
292    async fn initialize(&self, context: SourceRuntimeContext) {
293        (**self).initialize(context).await
294    }
295
296    async fn set_bootstrap_provider(
297        &self,
298        provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
299    ) {
300        (**self).set_bootstrap_provider(provider).await
301    }
302}