Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::RwLock;
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::state_store::StateStoreProvider;
47use drasi_core::models::SourceChange;
48
49/// Parameters for creating a SourceBase instance.
50///
51/// This struct contains only the information that SourceBase needs to function.
52/// Plugin-specific configuration should remain in the plugin crate.
53///
54/// # Example
55///
56/// ```ignore
57/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
58///
59/// let params = SourceBaseParams::new("my-source")
60///     .with_dispatch_mode(DispatchMode::Channel)
61///     .with_dispatch_buffer_capacity(2000)
62///     .with_bootstrap_provider(my_provider);
63///
64/// let base = SourceBase::new(params)?;
65/// ```
66pub struct SourceBaseParams {
67    /// Unique identifier for the source
68    pub id: String,
69    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
70    pub dispatch_mode: Option<DispatchMode>,
71    /// Dispatch buffer capacity - defaults to 1000
72    pub dispatch_buffer_capacity: Option<usize>,
73    /// Optional bootstrap provider to set during construction
74    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
75    /// Whether this source should auto-start - defaults to true
76    pub auto_start: bool,
77}
78
79impl std::fmt::Debug for SourceBaseParams {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("SourceBaseParams")
82            .field("id", &self.id)
83            .field("dispatch_mode", &self.dispatch_mode)
84            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
85            .field(
86                "bootstrap_provider",
87                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
88            )
89            .field("auto_start", &self.auto_start)
90            .finish()
91    }
92}
93
94impl SourceBaseParams {
95    /// Create new params with just an ID, using defaults for everything else
96    pub fn new(id: impl Into<String>) -> Self {
97        Self {
98            id: id.into(),
99            dispatch_mode: None,
100            dispatch_buffer_capacity: None,
101            bootstrap_provider: None,
102            auto_start: true,
103        }
104    }
105
106    /// Set the dispatch mode
107    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
108        self.dispatch_mode = Some(mode);
109        self
110    }
111
112    /// Set the dispatch buffer capacity
113    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
114        self.dispatch_buffer_capacity = Some(capacity);
115        self
116    }
117
118    /// Set the bootstrap provider
119    ///
120    /// This provider will be used during source subscription to deliver
121    /// initial data to queries that request bootstrap.
122    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
123        self.bootstrap_provider = Some(Box::new(provider));
124        self
125    }
126
127    /// Set whether this source should auto-start
128    ///
129    /// Default is `true`. Set to `false` if this source should only be
130    /// started manually via `start_source()`.
131    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
132        self.auto_start = auto_start;
133        self
134    }
135}
136
137/// Base implementation for common source functionality
138pub struct SourceBase {
139    /// Source identifier
140    pub id: String,
141    /// Dispatch mode setting
142    dispatch_mode: DispatchMode,
143    /// Dispatch buffer capacity
144    dispatch_buffer_capacity: usize,
145    /// Whether this source should auto-start
146    pub auto_start: bool,
147    /// Component status handle — always available, wired to graph during initialize().
148    status_handle: ComponentStatusHandle,
149    /// Dispatchers for sending source events to subscribers
150    ///
151    /// This is a vector of dispatchers that send source events to all registered
152    /// subscribers (queries). When a source produces a change event, it broadcasts
153    /// it to all dispatchers in this vector.
154    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
155    /// Runtime context (set by initialize())
156    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
157    /// State store provider (extracted from context for convenience)
158    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
159    /// Handle to the source's main task
160    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
161    /// Sender for shutdown signal
162    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
163    /// Optional bootstrap provider - plugins set this if they support bootstrap
164    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
165    /// Optional identity provider for credential management.
166    /// Set either programmatically (via `set_identity_provider`) or automatically
167    /// from the runtime context during `initialize()`.
168    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
169    /// Per-query position handles for replay-capable subscribers.
170    ///
171    /// Keyed by `query_id`. Values are the same `Arc<AtomicU64>` returned in
172    /// `SubscriptionResponse::position_handle`. The query writes its last
173    /// durably-processed sequence; the source reads `compute_confirmed_position()`
174    /// to advance its upstream cursor. Initial value is `u64::MAX`
175    /// ("no position confirmed yet"). Only populated when
176    /// `request_position_handle == true`.
177    position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
178}
179
180impl SourceBase {
181    /// Create a new SourceBase with the given parameters
182    ///
183    /// The status channel is not required during construction - it will be
184    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
185    ///
186    /// If a bootstrap provider is specified in params, it will be set during
187    /// construction (no async needed since nothing is shared yet).
188    pub fn new(params: SourceBaseParams) -> Result<Self> {
189        // Determine dispatch mode (default to Channel if not specified)
190        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
191        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
192
193        // Set up initial dispatchers based on dispatch mode
194        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
195            Vec::new();
196
197        if dispatch_mode == DispatchMode::Broadcast {
198            // For broadcast mode, create a single broadcast dispatcher
199            let dispatcher =
200                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
201            dispatchers.push(Box::new(dispatcher));
202        }
203        // For channel mode, dispatchers will be created on-demand when subscribing
204
205        // Initialize bootstrap provider if provided (no async needed at construction time)
206        let bootstrap_provider = params
207            .bootstrap_provider
208            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
209
210        Ok(Self {
211            id: params.id.clone(),
212            dispatch_mode,
213            dispatch_buffer_capacity,
214            auto_start: params.auto_start,
215            status_handle: ComponentStatusHandle::new(&params.id),
216            dispatchers: Arc::new(RwLock::new(dispatchers)),
217            context: Arc::new(RwLock::new(None)), // Set by initialize()
218            state_store: Arc::new(RwLock::new(None)), // Extracted from context
219            task_handle: Arc::new(RwLock::new(None)),
220            shutdown_tx: Arc::new(RwLock::new(None)),
221            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
222            identity_provider: Arc::new(RwLock::new(None)),
223            position_handles: Arc::new(RwLock::new(HashMap::new())),
224        })
225    }
226
227    /// Get whether this source should auto-start
228    pub fn get_auto_start(&self) -> bool {
229        self.auto_start
230    }
231
232    /// Initialize the source with runtime context.
233    ///
234    /// This method is called automatically by DrasiLib's `add_source()` method.
235    /// Plugin developers do not need to call this directly.
236    ///
237    /// The context provides access to:
238    /// - `source_id`: The source's unique identifier
239    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
240    /// - `state_store`: Optional persistent state storage
241    pub async fn initialize(&self, context: SourceRuntimeContext) {
242        // Store context for later use
243        *self.context.write().await = Some(context.clone());
244
245        // Wire the status handle to the graph update channel
246        self.status_handle.wire(context.update_tx.clone()).await;
247
248        if let Some(state_store) = context.state_store.as_ref() {
249            *self.state_store.write().await = Some(state_store.clone());
250        }
251
252        // Store identity provider from context if not already set programmatically
253        if let Some(ip) = context.identity_provider.as_ref() {
254            let mut guard = self.identity_provider.write().await;
255            if guard.is_none() {
256                *guard = Some(ip.clone());
257            }
258        }
259    }
260
261    /// Get the runtime context if initialized.
262    ///
263    /// Returns `None` if `initialize()` has not been called yet.
264    pub async fn context(&self) -> Option<SourceRuntimeContext> {
265        self.context.read().await.clone()
266    }
267
268    /// Get the state store if configured.
269    ///
270    /// Returns `None` if no state store was provided in the context.
271    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
272        self.state_store.read().await.clone()
273    }
274
275    /// Get the identity provider if set.
276    ///
277    /// Returns the identity provider set either programmatically via
278    /// `set_identity_provider()` or from the runtime context during `initialize()`.
279    /// Programmatically-set providers take precedence over context providers.
280    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
281        self.identity_provider.read().await.clone()
282    }
283
284    /// Set the identity provider programmatically.
285    ///
286    /// This is typically called during source construction when the provider
287    /// is available from configuration (e.g., `with_identity_provider()` builder).
288    /// Providers set this way take precedence over context-injected providers.
289    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
290        *self.identity_provider.write().await = Some(provider);
291    }
292
293    /// Create and register a position handle for `query_id`, initialized to `u64::MAX`.
294    ///
295    /// Returns the shared handle; the same `Arc` is placed in
296    /// `SubscriptionResponse::position_handle` so the query and the source share
297    /// one atomic. If a handle already exists for `query_id` (re-subscribe after
298    /// transient disconnect), the existing handle is returned to preserve any
299    /// position the query had previously reported.
300    pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
301        let mut handles = self.position_handles.write().await;
302        if let Some(existing) = handles.get(query_id) {
303            return existing.clone();
304        }
305        let handle = Arc::new(AtomicU64::new(u64::MAX));
306        handles.insert(query_id.to_string(), handle.clone());
307        handle
308    }
309
310    /// Remove the position handle for `query_id`. No-op if absent.
311    ///
312    /// Called from explicit cleanup paths (`stop_query`/`delete_query` will be
313    /// wired in a follow-up issue). Until then, `cleanup_stale_handles()`
314    /// (invoked inside `compute_confirmed_position`) catches dropped subscribers.
315    pub async fn remove_position_handle(&self, query_id: &str) {
316        let mut handles = self.position_handles.write().await;
317        handles.remove(query_id);
318    }
319
320    /// Compute the minimum confirmed position across all live subscribers.
321    ///
322    /// Returns `None` if no handles are registered, or if every registered
323    /// handle is still `u64::MAX` (no subscriber has confirmed a position yet —
324    /// typically because they are still bootstrapping). Otherwise returns the
325    /// minimum non-`u64::MAX` value, suitable for advancing the source's
326    /// upstream cursor (Postgres `flush_lsn`, Kafka commit, transient WAL prune
327    /// threshold).
328    ///
329    /// Piggy-backs `cleanup_stale_handles()` so dropped subscribers do not pin
330    /// the watermark indefinitely.
331    pub async fn compute_confirmed_position(&self) -> Option<u64> {
332        self.cleanup_stale_handles().await;
333        let handles = self.position_handles.read().await;
334        let mut min: Option<u64> = None;
335        for handle in handles.values() {
336            let v = handle.load(Ordering::Relaxed);
337            if v == u64::MAX {
338                continue;
339            }
340            min = Some(min.map_or(v, |m| m.min(v)));
341        }
342        min
343    }
344
345    /// Drop entries whose `Arc::strong_count == 1` (only `SourceBase` holds a
346    /// reference).
347    ///
348    /// This indicates the subscriber dropped its `SubscriptionResponse` without
349    /// calling `remove_position_handle` — common during query teardown until
350    /// explicit cleanup is wired by the query manager.
351    ///
352    /// Safety constraint: this relies on `SourceBase` being the only long-lived
353    /// holder of the `Arc` besides the subscribing query. If a future periodic
354    /// scan task (or any other component) clones the handle, this method must
355    /// be revisited or replaced with explicit liveness tracking.
356    pub async fn cleanup_stale_handles(&self) {
357        let mut handles = self.position_handles.write().await;
358        handles.retain(|_, handle| Arc::strong_count(handle) > 1);
359    }
360
361    /// Returns a clonable [`ComponentStatusHandle`] for use in spawned tasks.
362    ///
363    /// The handle can both read and write the component's status and automatically
364    /// notifies the graph on every status change (after `initialize()`).
365    pub fn status_handle(&self) -> ComponentStatusHandle {
366        self.status_handle.clone()
367    }
368
369    /// Clone the SourceBase with shared Arc references
370    ///
371    /// This creates a new SourceBase that shares the same underlying
372    /// data through Arc references. Useful for passing to spawned tasks.
373    pub fn clone_shared(&self) -> Self {
374        Self {
375            id: self.id.clone(),
376            dispatch_mode: self.dispatch_mode,
377            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
378            auto_start: self.auto_start,
379            status_handle: self.status_handle.clone(),
380            dispatchers: self.dispatchers.clone(),
381            context: self.context.clone(),
382            state_store: self.state_store.clone(),
383            task_handle: self.task_handle.clone(),
384            shutdown_tx: self.shutdown_tx.clone(),
385            bootstrap_provider: self.bootstrap_provider.clone(),
386            identity_provider: self.identity_provider.clone(),
387            position_handles: self.position_handles.clone(),
388        }
389    }
390
391    /// Set the bootstrap provider for this source, taking ownership.
392    ///
393    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
394    /// The bootstrap provider is created by the plugin using its own configuration.
395    ///
396    /// # Example
397    /// ```ignore
398    /// let provider = MyBootstrapProvider::new(config);
399    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
400    /// ```
401    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
402        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
403    }
404
405    /// Get the source ID
406    pub fn get_id(&self) -> &str {
407        &self.id
408    }
409
410    /// Create a streaming receiver for a query subscription
411    ///
412    /// This creates the appropriate receiver based on the configured dispatch mode:
413    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
414    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
415    ///
416    /// This is a helper method that can be used by sources with custom subscribe logic.
417    pub async fn create_streaming_receiver(
418        &self,
419    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
420        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
421            DispatchMode::Broadcast => {
422                // For broadcast mode, use the single dispatcher
423                let dispatchers = self.dispatchers.read().await;
424                if let Some(dispatcher) = dispatchers.first() {
425                    dispatcher.create_receiver().await?
426                } else {
427                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
428                }
429            }
430            DispatchMode::Channel => {
431                // For channel mode, create a new dispatcher for this subscription
432                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
433                    self.dispatch_buffer_capacity,
434                );
435                let receiver = dispatcher.create_receiver().await?;
436
437                // Add the new dispatcher to our list
438                let mut dispatchers = self.dispatchers.write().await;
439                dispatchers.push(Box::new(dispatcher));
440
441                receiver
442            }
443        };
444
445        Ok(receiver)
446    }
447
448    /// Subscribe to this source with optional bootstrap
449    ///
450    /// This is the standard subscribe implementation that all sources can use.
451    /// It handles:
452    /// - Creating a receiver for streaming events (based on dispatch mode)
453    /// - Setting up bootstrap if requested and a provider has been set
454    /// - Returning the appropriate SubscriptionResponse
455    pub async fn subscribe_with_bootstrap(
456        &self,
457        settings: &crate::config::SourceSubscriptionSettings,
458        source_type: &str,
459    ) -> Result<SubscriptionResponse> {
460        info!(
461            "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
462            settings.query_id,
463            source_type,
464            self.id,
465            settings.enable_bootstrap,
466            settings.resume_from,
467            settings.request_position_handle
468        );
469
470        // Create streaming receiver using helper method
471        let receiver = self.create_streaming_receiver().await?;
472
473        let query_id_for_response = settings.query_id.clone();
474
475        // resume_from overrides bootstrap: a resuming query already has base
476        // state in its persistent index and just needs replay from the
477        // requested sequence. Re-bootstrapping would corrupt that state.
478        let bootstrap_receiver = if settings.resume_from.is_some() {
479            info!(
480                "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
481                settings.query_id, settings.resume_from, source_type, self.id
482            );
483            None
484        } else if settings.enable_bootstrap {
485            self.handle_bootstrap_subscription(settings, source_type)
486                .await?
487        } else {
488            None
489        };
490
491        // Only persistent (replay-capable) queries request a handle. Volatile
492        // queries are deliberately excluded from the min-watermark so they
493        // cannot pin upstream advancement.
494        let position_handle = if settings.request_position_handle {
495            Some(self.create_position_handle(&settings.query_id).await)
496        } else {
497            None
498        };
499
500        Ok(SubscriptionResponse {
501            query_id: query_id_for_response,
502            source_id: self.id.clone(),
503            receiver,
504            bootstrap_receiver,
505            position_handle,
506        })
507    }
508
509    /// Handle bootstrap subscription logic
510    async fn handle_bootstrap_subscription(
511        &self,
512        settings: &crate::config::SourceSubscriptionSettings,
513        source_type: &str,
514    ) -> Result<Option<BootstrapEventReceiver>> {
515        let provider_guard = self.bootstrap_provider.read().await;
516        if let Some(provider) = provider_guard.clone() {
517            drop(provider_guard); // Release lock before spawning task
518
519            info!(
520                "Creating bootstrap for query '{}' on {} source '{}'",
521                settings.query_id, source_type, self.id
522            );
523
524            // Create bootstrap context
525            let context = BootstrapContext::new_minimal(
526                self.id.clone(), // server_id
527                self.id.clone(), // source_id
528            );
529
530            // Create bootstrap channel
531            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
532
533            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
534            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
535            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
536
537            // Create bootstrap request with request_id
538            let request = BootstrapRequest {
539                query_id: settings.query_id.clone(),
540                node_labels,
541                relation_labels,
542                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
543            };
544
545            // Clone settings for the async task
546            let settings_clone = settings.clone();
547            let source_id = self.id.clone();
548
549            // Get instance_id from context for log routing isolation
550            let instance_id = self
551                .context()
552                .await
553                .map(|c| c.instance_id.clone())
554                .unwrap_or_default();
555
556            // Spawn bootstrap task with tracing span for proper log routing
557            let span = tracing::info_span!(
558                "source_bootstrap",
559                instance_id = %instance_id,
560                component_id = %source_id,
561                component_type = "source"
562            );
563            tokio::spawn(
564                async move {
565                    match provider
566                        .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
567                        .await
568                    {
569                        Ok(result) => {
570                            info!(
571                                "Bootstrap completed successfully for query '{}', sent {} events",
572                                settings_clone.query_id, result.event_count
573                            );
574                            // `result.last_sequence` / `result.sequences_aligned`
575                            // are intentionally unused at this call site — a
576                            // future query-processor integration issue will
577                            // plumb them through to the handover protocol.
578                        }
579                        Err(e) => {
580                            error!(
581                                "Bootstrap failed for query '{}': {e}",
582                                settings_clone.query_id
583                            );
584                        }
585                    }
586                }
587                .instrument(span),
588            );
589
590            Ok(Some(bootstrap_rx))
591        } else {
592            info!(
593                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
594                settings.query_id, source_type, self.id
595            );
596            Ok(None)
597        }
598    }
599
600    /// Dispatch a SourceChange event with profiling metadata
601    ///
602    /// This method handles the common pattern of:
603    /// - Creating profiling metadata with timestamp
604    /// - Wrapping the change in a SourceEventWrapper
605    /// - Dispatching to all subscribers
606    /// - Handling the no-subscriber case gracefully
607    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
608        // Create profiling metadata
609        let mut profiling = profiling::ProfilingMetadata::new();
610        profiling.source_send_ns = Some(profiling::timestamp_ns());
611
612        // Create event wrapper
613        let wrapper = SourceEventWrapper::with_profiling(
614            self.id.clone(),
615            SourceEvent::Change(change),
616            chrono::Utc::now(),
617            profiling,
618        );
619
620        // Dispatch event
621        self.dispatch_event(wrapper).await
622    }
623
624    /// Dispatch a SourceEventWrapper to all subscribers
625    ///
626    /// This is a generic method for dispatching any SourceEvent.
627    /// It handles Arc-wrapping for zero-copy sharing and logs
628    /// when there are no subscribers.
629    pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
630        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
631
632        // Arc-wrap for zero-copy sharing across dispatchers
633        let arc_wrapper = Arc::new(wrapper);
634
635        // Send to all dispatchers
636        let dispatchers = self.dispatchers.read().await;
637        for dispatcher in dispatchers.iter() {
638            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
639                debug!("[{}] Failed to dispatch event: {}", self.id, e);
640            }
641        }
642
643        Ok(())
644    }
645
646    /// Broadcast SourceControl events
647    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
648        let wrapper = SourceEventWrapper::new(
649            self.id.clone(),
650            SourceEvent::Control(control),
651            chrono::Utc::now(),
652        );
653        self.dispatch_event(wrapper).await
654    }
655
656    /// Create a test subscription to this source (synchronous, fallible)
657    ///
658    /// This method is intended for use in tests to receive events from the source.
659    /// It properly handles both Broadcast and Channel dispatch modes by delegating
660    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
661    ///
662    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
663    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
664    ///
665    /// # Returns
666    /// A receiver that will receive all events dispatched by this source,
667    /// or an error if the receiver cannot be created.
668    pub fn try_test_subscribe(
669        &self,
670    ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
671        tokio::task::block_in_place(|| {
672            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
673        })
674    }
675
676    /// Create a test subscription to this source (synchronous wrapper)
677    ///
678    /// Convenience wrapper around [`try_test_subscribe`](Self::try_test_subscribe)
679    /// that panics on failure. Prefer `try_test_subscribe()` in new code.
680    ///
681    /// # Panics
682    /// Panics if the receiver cannot be created.
683    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
684        self.try_test_subscribe()
685            .expect("Failed to create test subscription receiver")
686    }
687
688    /// Helper function to dispatch events from spawned tasks
689    ///
690    /// This is a static helper that can be used from spawned async tasks that don't
691    /// have access to `self`. It manually iterates through dispatchers and sends the event.
692    ///
693    /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
694    ///
695    /// # Arguments
696    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
697    /// * `wrapper` - The event wrapper to dispatch
698    /// * `source_id` - Source ID for logging
699    pub async fn dispatch_from_task(
700        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
701        wrapper: SourceEventWrapper,
702        source_id: &str,
703    ) -> Result<()> {
704        debug!(
705            "[{}] Dispatching event from task: {:?}",
706            source_id, &wrapper
707        );
708
709        // Arc-wrap for zero-copy sharing across dispatchers
710        let arc_wrapper = Arc::new(wrapper);
711
712        // Send to all dispatchers
713        let dispatchers_guard = dispatchers.read().await;
714        for dispatcher in dispatchers_guard.iter() {
715            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
716                debug!("[{source_id}] Failed to dispatch event from task: {e}");
717            }
718        }
719
720        Ok(())
721    }
722
723    /// Handle common stop functionality
724    pub async fn stop_common(&self) -> Result<()> {
725        info!("Stopping source '{}'", self.id);
726
727        // Send shutdown signal if we have one
728        if let Some(tx) = self.shutdown_tx.write().await.take() {
729            let _ = tx.send(());
730        }
731
732        // Wait for task to complete
733        if let Some(mut handle) = self.task_handle.write().await.take() {
734            match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
735                Ok(Ok(())) => {
736                    info!("Source '{}' task completed successfully", self.id);
737                }
738                Ok(Err(e)) => {
739                    error!("Source '{}' task panicked: {}", self.id, e);
740                }
741                Err(_) => {
742                    warn!(
743                        "Source '{}' task did not complete within timeout, aborting",
744                        self.id
745                    );
746                    handle.abort();
747                }
748            }
749        }
750
751        self.set_status(
752            ComponentStatus::Stopped,
753            Some(format!("Source '{}' stopped", self.id)),
754        )
755        .await;
756        info!("Source '{}' stopped", self.id);
757        Ok(())
758    }
759
760    /// Clear the source's state store partition.
761    ///
762    /// This is called during deprovision to remove all persisted state
763    /// associated with this source. Sources that override `deprovision()`
764    /// can call this to clean up their state store.
765    pub async fn deprovision_common(&self) -> Result<()> {
766        info!("Deprovisioning source '{}'", self.id);
767        if let Some(store) = self.state_store().await {
768            let count = store.clear_store(&self.id).await.map_err(|e| {
769                anyhow::anyhow!(
770                    "Failed to clear state store for source '{}': {}",
771                    self.id,
772                    e
773                )
774            })?;
775            info!(
776                "Cleared {} keys from state store for source '{}'",
777                count, self.id
778            );
779        }
780        Ok(())
781    }
782
783    /// Get the current status.
784    pub async fn get_status(&self) -> ComponentStatus {
785        self.status_handle.get_status().await
786    }
787
788    /// Set the component's status — updates local state AND notifies the graph.
789    ///
790    /// This is the single canonical way to change a source's status.
791    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
792        self.status_handle.set_status(status, message).await;
793    }
794
795    /// Set the task handle
796    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
797        *self.task_handle.write().await = Some(handle);
798    }
799
800    /// Set the shutdown sender
801    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
802        *self.shutdown_tx.write().await = Some(tx);
803    }
804}
805
806#[cfg(test)]
807mod tests {
808    use super::*;
809
810    // =========================================================================
811    // SourceBaseParams tests
812    // =========================================================================
813
814    #[test]
815    fn test_params_new_defaults() {
816        let params = SourceBaseParams::new("test-source");
817        assert_eq!(params.id, "test-source");
818        assert!(params.dispatch_mode.is_none());
819        assert!(params.dispatch_buffer_capacity.is_none());
820        assert!(params.bootstrap_provider.is_none());
821        assert!(params.auto_start);
822    }
823
824    #[test]
825    fn test_params_with_dispatch_mode() {
826        let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
827        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
828    }
829
830    #[test]
831    fn test_params_with_dispatch_buffer_capacity() {
832        let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
833        assert_eq!(params.dispatch_buffer_capacity, Some(50000));
834    }
835
836    #[test]
837    fn test_params_with_auto_start_false() {
838        let params = SourceBaseParams::new("s1").with_auto_start(false);
839        assert!(!params.auto_start);
840    }
841
842    #[test]
843    fn test_params_builder_chaining() {
844        let params = SourceBaseParams::new("chained")
845            .with_dispatch_mode(DispatchMode::Broadcast)
846            .with_dispatch_buffer_capacity(2000)
847            .with_auto_start(false);
848
849        assert_eq!(params.id, "chained");
850        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
851        assert_eq!(params.dispatch_buffer_capacity, Some(2000));
852        assert!(!params.auto_start);
853    }
854
855    // =========================================================================
856    // SourceBase tests
857    // =========================================================================
858
859    #[tokio::test]
860    async fn test_new_defaults() {
861        let params = SourceBaseParams::new("my-source");
862        let base = SourceBase::new(params).unwrap();
863
864        assert_eq!(base.id, "my-source");
865        assert!(base.auto_start);
866        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
867    }
868
869    #[tokio::test]
870    async fn test_get_id() {
871        let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
872        assert_eq!(base.get_id(), "id-check");
873    }
874
875    #[tokio::test]
876    async fn test_get_auto_start() {
877        let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
878        assert!(base_default.get_auto_start());
879
880        let base_false =
881            SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
882        assert!(!base_false.get_auto_start());
883    }
884
885    #[tokio::test]
886    async fn test_get_status_initial() {
887        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
888        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
889    }
890
891    #[tokio::test]
892    async fn test_set_status() {
893        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
894
895        base.set_status(ComponentStatus::Running, None).await;
896        assert_eq!(base.get_status().await, ComponentStatus::Running);
897
898        base.set_status(ComponentStatus::Error, Some("oops".into()))
899            .await;
900        assert_eq!(base.get_status().await, ComponentStatus::Error);
901    }
902
903    #[tokio::test]
904    async fn test_status_handle_returns_handle() {
905        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
906        let handle = base.status_handle();
907
908        // The handle should reflect the same status as the base
909        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
910
911        // Mutating through the handle should be visible via SourceBase
912        handle.set_status(ComponentStatus::Starting, None).await;
913        assert_eq!(base.get_status().await, ComponentStatus::Starting);
914    }
915
916    // =========================================================================
917    // Position handle and resume_from tests (issue #366)
918    // =========================================================================
919
920    use crate::bootstrap::{
921        BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
922    };
923    use crate::channels::BootstrapEventSender;
924    use async_trait::async_trait;
925
926    fn make_settings(
927        query_id: &str,
928        enable_bootstrap: bool,
929        resume_from: Option<u64>,
930        request_position_handle: bool,
931    ) -> crate::config::SourceSubscriptionSettings {
932        use std::collections::HashSet;
933        crate::config::SourceSubscriptionSettings {
934            source_id: "test-src".to_string(),
935            enable_bootstrap,
936            query_id: query_id.to_string(),
937            nodes: HashSet::new(),
938            relations: HashSet::new(),
939            resume_from,
940            request_position_handle,
941        }
942    }
943
944    /// Minimal bootstrap provider that closes its sender immediately.
945    /// Enough to verify a `bootstrap_receiver` was created without sending data.
946    struct NoopProvider;
947
948    #[async_trait]
949    impl BootstrapProvider for NoopProvider {
950        async fn bootstrap(
951            &self,
952            _request: BootstrapRequest,
953            _context: &BootstrapContext,
954            _event_tx: BootstrapEventSender,
955            _settings: Option<&crate::config::SourceSubscriptionSettings>,
956        ) -> Result<BootstrapResult> {
957            Ok(BootstrapResult::default())
958        }
959    }
960
961    fn make_base_with_bootstrap(id: &str) -> SourceBase {
962        let mut params = SourceBaseParams::new(id);
963        params.bootstrap_provider = Some(Box::new(NoopProvider));
964        SourceBase::new(params).unwrap()
965    }
966
967    #[tokio::test]
968    async fn test_create_position_handle_initializes_to_u64_max() {
969        let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
970        let handle = base.create_position_handle("q1").await;
971        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
972    }
973
974    #[tokio::test]
975    async fn test_create_position_handle_idempotent_for_same_query() {
976        let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
977        let h1 = base.create_position_handle("q1").await;
978        h1.store(123, Ordering::Relaxed);
979        let h2 = base.create_position_handle("q1").await;
980        // Same Arc — preserves the previously reported position.
981        assert!(Arc::ptr_eq(&h1, &h2));
982        assert_eq!(h2.load(Ordering::Relaxed), 123);
983    }
984
985    #[tokio::test]
986    async fn test_remove_position_handle_drops_entry() {
987        let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
988        let handle = base.create_position_handle("q1").await;
989        handle.store(42, Ordering::Relaxed);
990        assert_eq!(base.compute_confirmed_position().await, Some(42));
991        base.remove_position_handle("q1").await;
992        assert_eq!(base.compute_confirmed_position().await, None);
993    }
994
995    #[tokio::test]
996    async fn test_remove_position_handle_noop_when_absent() {
997        let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
998        // Must not panic.
999        base.remove_position_handle("never-registered").await;
1000        assert_eq!(base.compute_confirmed_position().await, None);
1001    }
1002
1003    #[tokio::test]
1004    async fn test_compute_confirmed_position_returns_none_when_empty() {
1005        let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1006        assert_eq!(base.compute_confirmed_position().await, None);
1007    }
1008
1009    #[tokio::test]
1010    async fn test_compute_confirmed_position_returns_none_when_all_max() {
1011        let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1012        let _h1 = base.create_position_handle("q1").await;
1013        let _h2 = base.create_position_handle("q2").await;
1014        assert_eq!(base.compute_confirmed_position().await, None);
1015    }
1016
1017    #[tokio::test]
1018    async fn test_compute_confirmed_position_filters_max_returns_min() {
1019        let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1020        let h1 = base.create_position_handle("q1").await;
1021        let _h2 = base.create_position_handle("q2").await; // stays u64::MAX
1022        let h3 = base.create_position_handle("q3").await;
1023        h1.store(100, Ordering::Relaxed);
1024        h3.store(50, Ordering::Relaxed);
1025        assert_eq!(base.compute_confirmed_position().await, Some(50));
1026    }
1027
1028    #[tokio::test]
1029    async fn test_compute_confirmed_position_single_real_value() {
1030        let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1031        let h1 = base.create_position_handle("q1").await;
1032        let _h2 = base.create_position_handle("q2").await;
1033        h1.store(7, Ordering::Relaxed);
1034        assert_eq!(base.compute_confirmed_position().await, Some(7));
1035    }
1036
1037    #[tokio::test]
1038    async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1039        let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1040        {
1041            let handle = base.create_position_handle("q1").await;
1042            handle.store(99, Ordering::Relaxed);
1043            // handle (the external clone) drops here.
1044        }
1045        base.cleanup_stale_handles().await;
1046        assert_eq!(base.compute_confirmed_position().await, None);
1047    }
1048
1049    #[tokio::test]
1050    async fn test_cleanup_stale_handles_keeps_held_arc() {
1051        let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1052        let handle = base.create_position_handle("q1").await;
1053        handle.store(11, Ordering::Relaxed);
1054        base.cleanup_stale_handles().await;
1055        // External clone still alive, entry retained.
1056        assert_eq!(base.compute_confirmed_position().await, Some(11));
1057        // Keep `handle` alive past the assertion.
1058        drop(handle);
1059    }
1060
1061    #[tokio::test]
1062    async fn test_subscribe_with_request_position_handle_returns_handle() {
1063        let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1064        let response = base
1065            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1066            .await
1067            .unwrap();
1068        let handle = response.position_handle.expect("expected handle");
1069        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1070        // Internal map should also have the entry (so min-watermark sees it).
1071        assert_eq!(base.compute_confirmed_position().await, None); // u64::MAX → None
1072    }
1073
1074    #[tokio::test]
1075    async fn test_subscribe_without_request_position_handle_returns_none() {
1076        let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1077        let response = base
1078            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1079            .await
1080            .unwrap();
1081        assert!(response.position_handle.is_none());
1082        // Internal map must be empty so this volatile query is excluded from
1083        // the min-watermark.
1084        let handles = base.position_handles.read().await;
1085        assert!(handles.is_empty());
1086    }
1087
1088    #[tokio::test]
1089    async fn test_subscribe_returned_handle_shared_with_internal() {
1090        let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1091        let response = base
1092            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1093            .await
1094            .unwrap();
1095        let handle = response.position_handle.unwrap();
1096        handle.store(42, Ordering::Relaxed);
1097        assert_eq!(base.compute_confirmed_position().await, Some(42));
1098    }
1099
1100    #[tokio::test]
1101    async fn test_subscribe_with_resume_from_skips_bootstrap() {
1102        let base = make_base_with_bootstrap("sub-resume");
1103        let response = base
1104            .subscribe_with_bootstrap(&make_settings("q1", true, Some(100), false), "test")
1105            .await
1106            .unwrap();
1107        assert!(
1108            response.bootstrap_receiver.is_none(),
1109            "resume_from must override enable_bootstrap"
1110        );
1111    }
1112
1113    #[tokio::test]
1114    async fn test_subscribe_resume_without_bootstrap_still_none() {
1115        let base = make_base_with_bootstrap("sub-resume-no-bs");
1116        let response = base
1117            .subscribe_with_bootstrap(&make_settings("q1", false, Some(100), false), "test")
1118            .await
1119            .unwrap();
1120        assert!(response.bootstrap_receiver.is_none());
1121    }
1122
1123    #[tokio::test]
1124    async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1125        let base = make_base_with_bootstrap("sub-bs");
1126        let response = base
1127            .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1128            .await
1129            .unwrap();
1130        assert!(
1131            response.bootstrap_receiver.is_some(),
1132            "regression guard: bootstrap path must still produce a receiver"
1133        );
1134    }
1135
1136    #[tokio::test]
1137    async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1138        let base = make_base_with_bootstrap("sub-neither");
1139        let response = base
1140            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1141            .await
1142            .unwrap();
1143        assert!(response.bootstrap_receiver.is_none());
1144        assert!(response.position_handle.is_none());
1145    }
1146}