Skip to main content

drasi_lib/sources/
traits.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Source trait module
16//!
17//! This module provides the core trait that all source plugins must implement.
18//! It separates the plugin contract from the source manager and implementation details.
19//!
20//! # Plugin Architecture
21//!
22//! Each source plugin:
23//! 1. Defines its own typed configuration struct with builder pattern
24//! 2. Creates a `SourceBase` instance using `SourceBaseParams`
25//! 3. Implements the `Source` trait
26//! 4. Is passed to `DrasiLib` via `add_source()` which takes ownership
27//!
28//! drasi-lib has no knowledge of which plugins exist - it only knows about this trait.
29//!
30//! # Runtime Context Initialization
31//!
32//! Sources receive all drasi-lib services through a single `initialize()` call
33//! when added to DrasiLib. The `SourceRuntimeContext` provides:
34//! - `event_tx`: Channel for component lifecycle events
35//! - `state_store`: Optional persistent state storage
36//!
37//! This replaces the previous `inject_*` methods with a cleaner single-call pattern.
38
39use anyhow::Result;
40use async_trait::async_trait;
41use bytes::Bytes;
42use thiserror::Error;
43
44use crate::channels::*;
45use crate::context::SourceRuntimeContext;
46
47/// Structured error type for source operations.
48///
49/// Sources return these errors (via `anyhow::Error`) from trait methods like `subscribe()`.
50/// The orchestration layer can downcast to check for specific error variants:
51/// ```ignore
52/// if let Some(source_err) = err.downcast_ref::<SourceError>() {
53///     match source_err {
54///         SourceError::PositionUnavailable { .. } => { /* handle */ }
55///     }
56/// }
57/// ```
58#[derive(Error, Debug)]
59pub enum SourceError {
60    /// The requested resume position is no longer available.
61    /// The caller should consult its recovery_policy to decide next steps.
62    #[error("Source '{source_id}' cannot resume from requested position: position unavailable (earliest available: {earliest_available:?})")]
63    PositionUnavailable {
64        source_id: String,
65        /// The position that was requested (opaque bytes from a previous checkpoint)
66        requested: Bytes,
67        /// The earliest position the source can provide, if known
68        earliest_available: Option<Bytes>,
69    },
70}
71
72/// Trait for comparing source positions during per-subscriber replay filtering.
73///
74/// When a source rewinds to serve a late-joining subscriber, events that a
75/// subscriber has already committed must be suppressed. Since `source_position`
76/// is opaque bytes, the framework cannot compare them directly — the source
77/// must provide the comparison logic.
78///
79/// # Contract
80///
81/// `position_reached(event_pos, resume_pos)` returns `true` when the event at
82/// `event_pos` is **strictly after** `resume_pos` (i.e., it is new for a
83/// subscriber that last committed at `resume_pos`). Events at or before the
84/// resume position are filtered out.
85///
86/// # Default
87///
88/// [`ByteLexPositionComparator`] provides a byte-lexicographic comparison that
89/// works for any position encoded in big-endian (e.g., Postgres u64 LSN,
90/// MSSQL 20-byte LSN).
91pub trait PositionComparator: Send + Sync {
92    /// Returns `true` if `event_pos` is strictly after `resume_pos`.
93    fn position_reached(&self, event_pos: &Bytes, resume_pos: &Bytes) -> bool;
94}
95
96/// Default byte-lexicographic position comparator.
97///
98/// Works correctly for any position encoded as big-endian bytes of equal length.
99/// For positions of unequal length, shorter bytes compare as less than longer ones
100/// with the same prefix.
101#[derive(Debug, Clone, Default)]
102pub struct ByteLexPositionComparator;
103
104impl PositionComparator for ByteLexPositionComparator {
105    fn position_reached(&self, event_pos: &Bytes, resume_pos: &Bytes) -> bool {
106        event_pos.as_ref() > resume_pos.as_ref()
107    }
108}
109
110/// Trait defining the interface for all source implementations.
111///
112/// This is the core abstraction that all source plugins must implement.
113/// drasi-lib only interacts with sources through this trait - it has no
114/// knowledge of specific plugin types or their configurations.
115///
116/// # Example Implementation
117///
118/// ```ignore
119/// use drasi_lib::Source;
120/// use drasi_lib::sources::{SourceBase, SourceBaseParams};
121/// use drasi_lib::context::SourceRuntimeContext;
122///
123/// pub struct MySource {
124///     base: SourceBase,
125///     // Plugin-specific fields
126/// }
127///
128/// impl MySource {
129///     pub fn new(config: MySourceConfig) -> Result<Self> {
130///         let params = SourceBaseParams::new(&config.id)
131///             .with_dispatch_mode(config.dispatch_mode)
132///             .with_dispatch_buffer_capacity(config.buffer_capacity);
133///
134///         Ok(Self {
135///             base: SourceBase::new(params)?,
136///         })
137///     }
138/// }
139///
140/// #[async_trait]
141/// impl Source for MySource {
142///     fn id(&self) -> &str {
143///         &self.base.id
144///     }
145///
146///     fn type_name(&self) -> &str {
147///         "my-source"
148///     }
149///
150///     fn properties(&self) -> HashMap<String, Value> {
151///         // Return plugin-specific properties
152///     }
153///
154///     async fn initialize(&self, context: SourceRuntimeContext) {
155///         self.base.initialize(context).await;
156///     }
157///
158///     // ... implement other methods
159/// }
160/// ```
161#[async_trait]
162pub trait Source: Send + Sync {
163    /// Get the source's unique identifier
164    fn id(&self) -> &str;
165
166    /// Get the source type name (e.g., "postgres", "http", "mock")
167    fn type_name(&self) -> &str;
168
169    /// Return **all** configuration properties for this source, including secrets.
170    ///
171    /// # Persistence contract
172    ///
173    /// This method is the **serialization hook** used by the host to persist
174    /// configuration to disk. When the server saves its config file it calls
175    /// `snapshot_configuration()`, which in turn calls `properties()` on every
176    /// source. The returned map is written to the YAML config so the component
177    /// can be recreated on the next startup.
178    ///
179    /// Because there is no separate config cache — the live component is the
180    /// single source of truth — any key/value omitted here will be **lost** on
181    /// the next save and the component will fail to start after a restart.
182    ///
183    /// # ⚠ Do not filter secrets
184    ///
185    /// Implementations **must** include sensitive values (passwords, tokens,
186    /// connection strings, etc.). Removing them makes the persistence round-trip
187    /// lossy and breaks restart. The host is responsible for protecting the
188    /// config file on disk; this method is not an external-facing API.
189    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value>;
190
191    /// Get the dispatch mode for this source (Channel or Broadcast)
192    ///
193    /// Default is Channel mode for backpressure support.
194    fn dispatch_mode(&self) -> DispatchMode {
195        DispatchMode::Channel
196    }
197
198    /// Whether this source should auto-start when DrasiLib starts
199    ///
200    /// Default is `true`. Override to return `false` if this source
201    /// should only be started manually via `start_source()`.
202    fn auto_start(&self) -> bool {
203        true
204    }
205
206    /// Whether this source supports positional replay via `resume_from`.
207    ///
208    /// Sources backed by a persistent log (e.g., Postgres WAL, Kafka) return
209    /// `true` (the default). Volatile sources that cannot replay (e.g.,
210    /// in-memory-only or purely push-based) should override this to return
211    /// `false`. The orchestration layer uses this to validate compatibility
212    /// with persistent queries and to request position handles.
213    fn supports_replay(&self) -> bool {
214        true
215    }
216
217    /// Describe the graph schema this source provides, if known.
218    ///
219    /// This is a best-effort introspection hook used by inspection APIs and
220    /// future MCP adapters. Sources that cannot determine their schema should
221    /// return `None`.
222    fn describe_schema(&self) -> Option<crate::schema::SourceSchema> {
223        None
224    }
225
226    /// Start the source
227    ///
228    /// This begins data ingestion and event generation.
229    async fn start(&self) -> Result<()>;
230
231    /// Stop the source
232    ///
233    /// This stops data ingestion and cleans up resources.
234    async fn stop(&self) -> Result<()>;
235
236    /// Get the current status of the source
237    async fn status(&self) -> ComponentStatus;
238
239    /// Subscribe to this source for change events.
240    ///
241    /// This is called by queries to receive data changes from this source.
242    /// The source should return a receiver for streaming events and optionally
243    /// a bootstrap receiver for initial data.
244    ///
245    /// # Important
246    /// Implementations **must** call
247    /// [`SourceBase::apply_subscription_settings(&settings)`](crate::sources::base::SourceBase::apply_subscription_settings)
248    /// at the start of their implementation (or delegate to
249    /// [`SourceBase::subscribe_with_bootstrap()`](crate::sources::base::SourceBase::subscribe_with_bootstrap)
250    /// which does it automatically). Failing to do so will break sequence
251    /// monotonicity after restarts.
252    ///
253    /// # Arguments
254    /// * `settings` - Subscription settings including query ID, text, and labels of interest
255    ///
256    /// # Returns
257    /// A SubscriptionResponse containing:
258    /// * A receiver for streaming source events
259    /// * Optionally a bootstrap receiver for initial data
260    async fn subscribe(
261        &self,
262        settings: crate::config::SourceSubscriptionSettings,
263    ) -> Result<SubscriptionResponse>;
264
265    /// Downcast helper for testing - allows access to concrete types
266    fn as_any(&self) -> &dyn std::any::Any;
267
268    /// Permanently clean up internal state when the source is being removed.
269    ///
270    /// This is called when `remove_source(id, cleanup: true)` is used.
271    /// Use this to release external resources that should not persist after
272    /// the source is deleted (e.g., drop a replication slot, remove cursors).
273    ///
274    /// The default implementation is a no-op. Override only if your source
275    /// manages external state that needs explicit teardown.
276    ///
277    /// Errors are logged but do not prevent the source from being removed.
278    async fn deprovision(&self) -> Result<()> {
279        Ok(())
280    }
281
282    /// Initialize the source with runtime context.
283    ///
284    /// This method is called automatically by DrasiLib when the source is added
285    /// via `add_source()`. Plugin developers do not need to call this directly.
286    ///
287    /// The context provides access to:
288    /// - `source_id`: The source's unique identifier
289    /// - `event_tx`: Channel for reporting component lifecycle events
290    /// - `state_store`: Optional persistent state storage
291    ///
292    /// Implementation should delegate to `self.base.initialize(context).await`.
293    async fn initialize(&self, context: SourceRuntimeContext);
294
295    /// Set the bootstrap provider for this source
296    ///
297    /// This method allows setting a bootstrap provider after source construction.
298    /// It is optional - sources without a bootstrap provider will report that
299    /// bootstrap is not available.
300    ///
301    /// Implementation should delegate to `self.base.set_bootstrap_provider(provider).await`.
302    async fn set_bootstrap_provider(
303        &self,
304        _provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
305    ) {
306        // Default implementation does nothing - sources that support bootstrap
307        // should override this to delegate to their SourceBase
308    }
309
310    /// Release the position handle that a query was holding on this source.
311    ///
312    /// Called during query stop to let the source advance its min-watermark.
313    /// Sources that use position handles should delegate to
314    /// `self.base.remove_position_handle(query_id).await`.
315    ///
316    /// The default is a no-op for sources that do not manage position handles.
317    ///
318    /// **Note:** This method has no FFI vtable entry yet. `SourceProxy`
319    /// overrides it with an explicit no-op + log message, and overrides
320    /// `supports_replay()` to return `false` so the orchestration layer
321    /// does not expect plugin sources to support position handles. A future
322    /// FFI SDK update (see issue #371) will add the vtable entry so plugin
323    /// sources can participate in position-handle cleanup.
324    async fn remove_position_handle(&self, _query_id: &str) {}
325
326    /// Set the identity provider for this source.
327    ///
328    /// This method allows attaching a per-source identity provider after
329    /// construction (e.g. when wiring up a source from declarative config that
330    /// references a named identity provider). It is optional — sources that do
331    /// not authenticate to external systems can ignore it.
332    ///
333    /// Identity providers set via this method take precedence over any
334    /// instance-wide provider injected through the runtime context during
335    /// `initialize()`.
336    ///
337    /// Implementations backed by a [`SourceBase`](crate::sources::SourceBase)
338    /// should delegate to `self.base.set_identity_provider(provider).await`;
339    /// other implementors should store the provider and apply it during
340    /// `initialize()`.
341    async fn set_identity_provider(
342        &self,
343        _provider: std::sync::Arc<dyn crate::identity::IdentityProvider>,
344    ) {
345        // Default implementation does nothing - sources that consume an
346        // identity provider should override this to delegate to their SourceBase.
347    }
348}
349
350/// Blanket implementation of Source for `Box<dyn Source>`
351///
352/// This allows boxed trait objects to be used with methods expecting `impl Source`.
353#[async_trait]
354impl Source for Box<dyn Source + 'static> {
355    fn id(&self) -> &str {
356        (**self).id()
357    }
358
359    fn type_name(&self) -> &str {
360        (**self).type_name()
361    }
362
363    fn properties(&self) -> std::collections::HashMap<String, serde_json::Value> {
364        (**self).properties()
365    }
366
367    fn dispatch_mode(&self) -> DispatchMode {
368        (**self).dispatch_mode()
369    }
370
371    fn auto_start(&self) -> bool {
372        (**self).auto_start()
373    }
374
375    fn describe_schema(&self) -> Option<crate::schema::SourceSchema> {
376        (**self).describe_schema()
377    }
378
379    fn supports_replay(&self) -> bool {
380        (**self).supports_replay()
381    }
382
383    async fn start(&self) -> Result<()> {
384        (**self).start().await
385    }
386
387    async fn stop(&self) -> Result<()> {
388        (**self).stop().await
389    }
390
391    async fn status(&self) -> ComponentStatus {
392        (**self).status().await
393    }
394
395    async fn subscribe(
396        &self,
397        settings: crate::config::SourceSubscriptionSettings,
398    ) -> Result<SubscriptionResponse> {
399        (**self).subscribe(settings).await
400    }
401
402    fn as_any(&self) -> &dyn std::any::Any {
403        (**self).as_any()
404    }
405
406    async fn deprovision(&self) -> Result<()> {
407        (**self).deprovision().await
408    }
409
410    async fn initialize(&self, context: SourceRuntimeContext) {
411        (**self).initialize(context).await
412    }
413
414    async fn set_bootstrap_provider(
415        &self,
416        provider: Box<dyn crate::bootstrap::BootstrapProvider + 'static>,
417    ) {
418        (**self).set_bootstrap_provider(provider).await
419    }
420
421    async fn remove_position_handle(&self, query_id: &str) {
422        (**self).remove_position_handle(query_id).await
423    }
424
425    async fn set_identity_provider(
426        &self,
427        provider: std::sync::Arc<dyn crate::identity::IdentityProvider>,
428    ) {
429        (**self).set_identity_provider(provider).await
430    }
431}