Skip to main content

drasi_lib/sources/
traits.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source trait module
16//!
17//! This module provides the core trait that all source plugins must implement.
18//! It separates the plugin contract from the source manager and implementation details.
19//!
20//! # Plugin Architecture
21//!
22//! Each source plugin:
23//! 1. Defines its own typed configuration struct with builder pattern
24//! 2. Creates a `SourceBase` instance using `SourceBaseParams`
25//! 3. Implements the `Source` trait
26//! 4. Is passed to `DrasiLib` via `add_source()` which takes ownership
27//!
28//! drasi-lib has no knowledge of which plugins exist - it only knows about this trait.
29//!
30//! # Runtime Context Initialization
31//!
32//! Sources receive all drasi-lib services through a single `initialize()` call
33//! when added to DrasiLib. The `SourceRuntimeContext` provides:
34//! - `event_tx`: Channel for component lifecycle events
35//! - `state_store`: Optional persistent state storage
36//!
37//! This replaces the previous `inject_*` methods with a cleaner single-call pattern.
38
39use anyhow::Result;
40use async_trait::async_trait;
41use bytes::Bytes;
42use thiserror::Error;
43
44use crate::channels::*;
45use crate::context::SourceRuntimeContext;
46
47/// Structured error type for source operations.
48///
49/// Sources return these errors (via `anyhow::Error`) from trait methods like `subscribe()`.
50/// The orchestration layer can downcast to check for specific error variants:
51/// ```ignore
52/// if let Some(source_err) = err.downcast_ref::<SourceError>() {
53///     match source_err {
54///         SourceError::PositionUnavailable { .. } => { /* handle */ }
55///     }
56/// }
57/// ```
58#[derive(Error, Debug)]
59pub enum SourceError {
60    /// The requested resume position is no longer available.
61    /// The caller should consult its recovery_policy to decide next steps.
62    #[error("Source '{source_id}' cannot resume from requested position: position unavailable (earliest available: {earliest_available:?})")]
63    PositionUnavailable {
64        source_id: String,
65        /// The position that was requested (opaque bytes from a previous checkpoint)
66        requested: Bytes,
67        /// The earliest position the source can provide, if known
68        earliest_available: Option<Bytes>,
69    },
70}
71
72/// Trait defining the interface for all source implementations.
73///
74/// This is the core abstraction that all source plugins must implement.
75/// drasi-lib only interacts with sources through this trait - it has no
76/// knowledge of specific plugin types or their configurations.
77///
78/// # Example Implementation
79///
80/// ```ignore
81/// use drasi_lib::Source;
82/// use drasi_lib::sources::{SourceBase, SourceBaseParams};
83/// use drasi_lib::context::SourceRuntimeContext;
84///
85/// pub struct MySource {
86///     base: SourceBase,
87///     // Plugin-specific fields
88/// }
89///
90/// impl MySource {
91///     pub fn new(config: MySourceConfig) -> Result<Self> {
92///         let params = SourceBaseParams::new(&config.id)
93///             .with_dispatch_mode(config.dispatch_mode)
94///             .with_dispatch_buffer_capacity(config.buffer_capacity);
95///
96///         Ok(Self {
97///             base: SourceBase::new(params)?,
98///         })
99///     }
100/// }
101///
102/// #[async_trait]
103/// impl Source for MySource {
104///     fn id(&self) -> &str {
105///         &self.base.id
106///     }
107///
108///     fn type_name(&self) -> &str {
109///         "my-source"
110///     }
111///
112///     fn properties(&self) -> HashMap<String, Value> {
113///         // Return plugin-specific properties
114///     }
115///
116///     async fn initialize(&self, context: SourceRuntimeContext) {
117///         self.base.initialize(context).await;
118///     }
119///
120///     // ... implement other methods
121/// }
122/// ```
123#[async_trait]
124pub trait Source: Send + Sync {
125    /// Get the source's unique identifier
126    fn id(&self) -> &str;
127
128    /// Get the source type name (e.g., "postgres", "http", "mock")
129    fn type_name(&self) -> &str;
130
131    /// Return **all** configuration properties for this source, including secrets.
132    ///
133    /// # Persistence contract
134    ///
135    /// This method is the **serialization hook** used by the host to persist
136    /// configuration to disk. When the server saves its config file it calls
137    /// `snapshot_configuration()`, which in turn calls `properties()` on every
138    /// source. The returned map is written to the YAML config so the component
139    /// can be recreated on the next startup.
140    ///
141    /// Because there is no separate config cache — the live component is the
142    /// single source of truth — any key/value omitted here will be **lost** on
143    /// the next save and the component will fail to start after a restart.
144    ///
145    /// # ⚠ Do not filter secrets
146    ///
147    /// Implementations **must** include sensitive values (passwords, tokens,
148    /// connection strings, etc.). Removing them makes the persistence round-trip
149    /// lossy and breaks restart. The host is responsible for protecting the
150    /// config file on disk; this method is not an external-facing API.
151    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value>;
152
153    /// Get the dispatch mode for this source (Channel or Broadcast)
154    ///
155    /// Default is Channel mode for backpressure support.
156    fn dispatch_mode(&self) -> DispatchMode {
157        DispatchMode::Channel
158    }
159
160    /// Whether this source should auto-start when DrasiLib starts
161    ///
162    /// Default is `true`. Override to return `false` if this source
163    /// should only be started manually via `start_source()`.
164    fn auto_start(&self) -> bool {
165        true
166    }
167
168    /// Whether this source supports positional replay via `resume_from`.
169    ///
170    /// Sources backed by a persistent log (e.g., Postgres WAL, Kafka) should
171    /// override this to return `true`. The orchestration layer uses this to
172    /// validate compatibility with persistent queries and to request position handles.
173    fn supports_replay(&self) -> bool {
174        false
175    }
176
177    /// Start the source
178    ///
179    /// This begins data ingestion and event generation.
180    async fn start(&self) -> Result<()>;
181
182    /// Stop the source
183    ///
184    /// This stops data ingestion and cleans up resources.
185    async fn stop(&self) -> Result<()>;
186
187    /// Get the current status of the source
188    async fn status(&self) -> ComponentStatus;
189
190    /// Subscribe to this source for change events.
191    ///
192    /// This is called by queries to receive data changes from this source.
193    /// The source should return a receiver for streaming events and optionally
194    /// a bootstrap receiver for initial data.
195    ///
196    /// # Important
197    /// Implementations **must** call
198    /// [`SourceBase::apply_subscription_settings(&settings)`](crate::sources::base::SourceBase::apply_subscription_settings)
199    /// at the start of their implementation (or delegate to
200    /// [`SourceBase::subscribe_with_bootstrap()`](crate::sources::base::SourceBase::subscribe_with_bootstrap)
201    /// which does it automatically). Failing to do so will break sequence
202    /// monotonicity after restarts.
203    ///
204    /// # Arguments
205    /// * `settings` - Subscription settings including query ID, text, and labels of interest
206    ///
207    /// # Returns
208    /// A SubscriptionResponse containing:
209    /// * A receiver for streaming source events
210    /// * Optionally a bootstrap receiver for initial data
211    async fn subscribe(
212        &self,
213        settings: crate::config::SourceSubscriptionSettings,
214    ) -> Result<SubscriptionResponse>;
215
216    /// Downcast helper for testing - allows access to concrete types
217    fn as_any(&self) -> &dyn std::any::Any;
218
219    /// Permanently clean up internal state when the source is being removed.
220    ///
221    /// This is called when `remove_source(id, cleanup: true)` is used.
222    /// Use this to release external resources that should not persist after
223    /// the source is deleted (e.g., drop a replication slot, remove cursors).
224    ///
225    /// The default implementation is a no-op. Override only if your source
226    /// manages external state that needs explicit teardown.
227    ///
228    /// Errors are logged but do not prevent the source from being removed.
229    async fn deprovision(&self) -> Result<()> {
230        Ok(())
231    }
232
233    /// Initialize the source with runtime context.
234    ///
235    /// This method is called automatically by DrasiLib when the source is added
236    /// via `add_source()`. Plugin developers do not need to call this directly.
237    ///
238    /// The context provides access to:
239    /// - `source_id`: The source's unique identifier
240    /// - `event_tx`: Channel for reporting component lifecycle events
241    /// - `state_store`: Optional persistent state storage
242    ///
243    /// Implementation should delegate to `self.base.initialize(context).await`.
244    async fn initialize(&self, context: SourceRuntimeContext);
245
246    /// Set the bootstrap provider for this source
247    ///
248    /// This method allows setting a bootstrap provider after source construction.
249    /// It is optional - sources without a bootstrap provider will report that
250    /// bootstrap is not available.
251    ///
252    /// Implementation should delegate to `self.base.set_bootstrap_provider(provider).await`.
253    async fn set_bootstrap_provider(
254        &self,
255        _provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
256    ) {
257        // Default implementation does nothing - sources that support bootstrap
258        // should override this to delegate to their SourceBase
259    }
260}
261
262/// Blanket implementation of Source for `Box<dyn Source>`
263///
264/// This allows boxed trait objects to be used with methods expecting `impl Source`.
265#[async_trait]
266impl Source for Box<dyn Source + 'static> {
267    fn id(&self) -> &str {
268        (**self).id()
269    }
270
271    fn type_name(&self) -> &str {
272        (**self).type_name()
273    }
274
275    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
276        (**self).properties()
277    }
278
279    fn dispatch_mode(&self) -> DispatchMode {
280        (**self).dispatch_mode()
281    }
282
283    fn auto_start(&self) -> bool {
284        (**self).auto_start()
285    }
286
287    fn supports_replay(&self) -> bool {
288        (**self).supports_replay()
289    }
290
291    async fn start(&self) -> Result<()> {
292        (**self).start().await
293    }
294
295    async fn stop(&self) -> Result<()> {
296        (**self).stop().await
297    }
298
299    async fn status(&self) -> ComponentStatus {
300        (**self).status().await
301    }
302
303    async fn subscribe(
304        &self,
305        settings: crate::config::SourceSubscriptionSettings,
306    ) -> Result<SubscriptionResponse> {
307        (**self).subscribe(settings).await
308    }
309
310    fn as_any(&self) -> &dyn std::any::Any {
311        (**self).as_any()
312    }
313
314    async fn deprovision(&self) -> Result<()> {
315        (**self).deprovision().await
316    }
317
318    async fn initialize(&self, context: SourceRuntimeContext) {
319        (**self).initialize(context).await
320    }
321
322    async fn set_bootstrap_provider(
323        &self,
324        provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
325    ) {
326        (**self).set_bootstrap_provider(provider).await
327    }
328}