Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::{BTreeMap, HashMap};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::{Notify, RwLock};
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::sources::PositionComparator;
47use crate::state_store::StateStoreProvider;
48use bytes::Bytes;
49use drasi_core::models::SourceChange;
50
51/// Parameters for creating a SourceBase instance.
52///
53/// This struct contains only the information that SourceBase needs to function.
54/// Plugin-specific configuration should remain in the plugin crate.
55///
56/// # Example
57///
58/// ```ignore
59/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
60///
61/// let params = SourceBaseParams::new("my-source")
62///     .with_dispatch_mode(DispatchMode::Channel)
63///     .with_dispatch_buffer_capacity(2000)
64///     .with_bootstrap_provider(my_provider);
65///
66/// let base = SourceBase::new(params)?;
67/// ```
68pub struct SourceBaseParams {
69    /// Unique identifier for the source
70    pub id: String,
71    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
72    pub dispatch_mode: Option<DispatchMode>,
73    /// Dispatch buffer capacity - defaults to 1000
74    pub dispatch_buffer_capacity: Option<usize>,
75    /// Optional state store provider to set during construction.
76    ///
77    /// When set, takes precedence over any state store provided via the runtime
78    /// context during `initialize()`. This differs from `bootstrap_provider` which
79    /// is never overwritten by context — only `state_store` and `identity_provider`
80    /// have context-provided fallback behavior.
81    pub state_store: Option<Arc<dyn StateStoreProvider>>,
82    /// Optional bootstrap provider to set during construction
83    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
84    /// Whether this source should auto-start - defaults to true
85    pub auto_start: bool,
86}
87
88impl std::fmt::Debug for SourceBaseParams {
89    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90        f.debug_struct("SourceBaseParams")
91            .field("id", &self.id)
92            .field("dispatch_mode", &self.dispatch_mode)
93            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
94            .field(
95                "state_store",
96                &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
97            )
98            .field(
99                "bootstrap_provider",
100                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
101            )
102            .field("auto_start", &self.auto_start)
103            .finish()
104    }
105}
106
107impl SourceBaseParams {
108    /// Create new params with just an ID, using defaults for everything else
109    pub fn new(id: impl Into<String>) -> Self {
110        Self {
111            id: id.into(),
112            dispatch_mode: None,
113            dispatch_buffer_capacity: None,
114            state_store: None,
115            bootstrap_provider: None,
116            auto_start: true,
117        }
118    }
119
120    /// Set the dispatch mode
121    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
122        self.dispatch_mode = Some(mode);
123        self
124    }
125
126    /// Set the dispatch buffer capacity
127    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
128        self.dispatch_buffer_capacity = Some(capacity);
129        self
130    }
131
132    /// Set the state store provider
133    ///
134    /// This is typically used when constructing sources outside of DrasiLib
135    /// and you want to provide a persistent store for checkpointing.
136    pub fn with_state_store(mut self, store: Arc<dyn StateStoreProvider>) -> Self {
137        self.state_store = Some(store);
138        self
139    }
140
141    /// Set the bootstrap provider
142    ///
143    /// This provider will be used during source subscription to deliver
144    /// initial data to queries that request bootstrap.
145    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
146        self.bootstrap_provider = Some(Box::new(provider));
147        self
148    }
149
150    /// Set whether this source should auto-start
151    ///
152    /// Default is `true`. Set to `false` if this source should only be
153    /// started manually via `start_source()`.
154    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
155        self.auto_start = auto_start;
156        self
157    }
158}
159
160/// Base implementation for common source functionality
161pub struct SourceBase {
162    /// Source identifier
163    pub id: String,
164    /// Dispatch mode setting
165    dispatch_mode: DispatchMode,
166    /// Dispatch buffer capacity
167    dispatch_buffer_capacity: usize,
168    /// Whether this source should auto-start
169    pub auto_start: bool,
170    /// Component status handle — always available, wired to graph during initialize().
171    status_handle: ComponentStatusHandle,
172    /// Dispatchers for sending source events to subscribers
173    ///
174    /// This is a vector of dispatchers that send source events to all registered
175    /// subscribers (queries). When a source produces a change event, it broadcasts
176    /// it to all dispatchers in this vector.
177    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
178    /// Runtime context (set by initialize())
179    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
180    /// State store provider (extracted from context for convenience)
181    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
182    /// Handle to the source's main task
183    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
184    /// Sender for shutdown signal
185    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
186    /// Optional bootstrap provider - plugins set this if they support bootstrap
187    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
188    /// Optional identity provider for credential management.
189    /// Set either programmatically (via `set_identity_provider`) or automatically
190    /// from the runtime context during `initialize()`.
191    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
192    /// Per-query position handles for replay-capable subscribers.
193    ///
194    /// Keyed by `query_id`. Values are the same `Arc<AtomicU64>` returned in
195    /// `SubscriptionResponse::position_handle`. The query writes its last
196    /// durably-processed sequence; the source reads `compute_confirmed_position()`
197    /// to advance its upstream cursor. Initial value is `u64::MAX`
198    /// ("no position confirmed yet"). Only populated when
199    /// `request_position_handle == true`.
200    position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
201    /// Monotonically increasing counter for assigning event sequences.
202    /// The framework stamps every dispatched event with this sequence.
203    next_sequence: Arc<AtomicU64>,
204    /// Original raw config JSON from the descriptor, preserving ConfigValue
205    /// envelopes (secrets, env vars) for lossless persistence roundtrips.
206    raw_config: Option<serde_json::Value>,
207    /// Notified whenever a new subscriber registers via `create_streaming_receiver`.
208    /// Sources can await `wait_for_subscribers()` before starting their polling
209    /// loop to avoid dispatching events before any subscriber exists.
210    subscriber_notify: Arc<Notify>,
211    /// Per-subscriber resume positions for replay filtering.
212    ///
213    /// Keyed by dispatcher index in the `dispatchers` Vec. When an event's
214    /// `source_position` has not yet passed the resume position, the event is
215    /// not delivered to that subscriber's dispatcher. Once `position_reached()`
216    /// returns true, the entry is removed and all subsequent events flow through.
217    ///
218    /// Only populated in Channel dispatch mode (Broadcast cannot filter per-subscriber).
219    subscriber_resume_positions: Arc<RwLock<HashMap<usize, Bytes>>>,
220    /// Optional position comparator for per-subscriber replay filtering.
221    ///
222    /// Set by sources that support replay. Without a comparator, position
223    /// filtering is disabled (all events are delivered to all subscribers).
224    position_comparator: Arc<RwLock<Option<Arc<dyn PositionComparator>>>>,
225    /// Maps framework sequence numbers to source positions (`source_position`
226    /// bytes from the dispatched event).
227    ///
228    /// Populated during `dispatch_event()` / `dispatch_events_batch()` for
229    /// events that carry a `source_position`.  Used by
230    /// `compute_confirmed_source_position()` to convert the confirmed
231    /// sequence (from position handles) back to a source-native position
232    /// (e.g. Postgres WAL LSN) for upstream cursor advancement.
233    ///
234    /// Pruned explicitly via `prune_position_map()` after the source has
235    /// successfully acknowledged the confirmed position to its upstream.
236    sequence_position_map: Arc<RwLock<BTreeMap<u64, Bytes>>>,
237}
238
239impl SourceBase {
240    /// Create a new SourceBase with the given parameters
241    ///
242    /// The status channel is not required during construction - it will be
243    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
244    ///
245    /// If a bootstrap provider is specified in params, it will be set during
246    /// construction (no async needed since nothing is shared yet).
247    pub fn new(params: SourceBaseParams) -> Result<Self> {
248        // Determine dispatch mode (default to Channel if not specified)
249        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
250        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
251
252        // Set up initial dispatchers based on dispatch mode
253        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
254            Vec::new();
255
256        if dispatch_mode == DispatchMode::Broadcast {
257            // For broadcast mode, create a single broadcast dispatcher
258            let dispatcher =
259                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
260            dispatchers.push(Box::new(dispatcher));
261        }
262        // For channel mode, dispatchers will be created on-demand when subscribing
263
264        // Initialize providers if provided (no async needed at construction time)
265        let bootstrap_provider = params
266            .bootstrap_provider
267            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
268
269        Ok(Self {
270            id: params.id.clone(),
271            dispatch_mode,
272            dispatch_buffer_capacity,
273            auto_start: params.auto_start,
274            status_handle: ComponentStatusHandle::new(&params.id),
275            dispatchers: Arc::new(RwLock::new(dispatchers)),
276            context: Arc::new(RwLock::new(None)), // Set by initialize()
277            state_store: Arc::new(RwLock::new(params.state_store)), // Extracted from context
278            task_handle: Arc::new(RwLock::new(None)),
279            shutdown_tx: Arc::new(RwLock::new(None)),
280            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
281            identity_provider: Arc::new(RwLock::new(None)),
282            position_handles: Arc::new(RwLock::new(HashMap::new())),
283            next_sequence: Arc::new(AtomicU64::new(1)),
284            raw_config: None,
285            subscriber_notify: Arc::new(Notify::new()),
286            subscriber_resume_positions: Arc::new(RwLock::new(HashMap::new())),
287            position_comparator: Arc::new(RwLock::new(None)),
288            sequence_position_map: Arc::new(RwLock::new(BTreeMap::new())),
289        })
290    }
291
292    /// Get whether this source should auto-start
293    pub fn get_auto_start(&self) -> bool {
294        self.auto_start
295    }
296
297    /// Get this source's dispatch mode.
298    pub fn get_dispatch_mode(&self) -> DispatchMode {
299        self.dispatch_mode
300    }
301
302    /// Set the original raw config JSON for lossless persistence roundtrips.
303    ///
304    /// Called by plugin descriptors to preserve the original config JSON
305    /// (including `ConfigValue` envelopes for secrets and env vars) before
306    /// resolution to plain values.
307    pub fn set_raw_config(&mut self, config: serde_json::Value) {
308        self.raw_config = Some(config);
309    }
310
311    /// Get the original raw config JSON, if set by a descriptor.
312    ///
313    /// Returns `None` for builder-created components that don't have
314    /// a raw config JSON (they use DTO reconstruction in `properties()`).
315    pub fn raw_config(&self) -> Option<&serde_json::Value> {
316        self.raw_config.as_ref()
317    }
318
319    /// Build the properties map for this source.
320    ///
321    /// If `raw_config` was set (descriptor path), returns its top-level keys.
322    /// Otherwise, serializes `fallback_dto` (the DTO reconstructed from typed
323    /// config) to produce camelCase output.
324    ///
325    /// This eliminates the duplicated if-let + serialize pattern from plugins.
326    pub fn properties_or_serialize<D: serde::Serialize>(
327        &self,
328        fallback_dto: &D,
329    ) -> HashMap<String, serde_json::Value> {
330        if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
331            return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
332        }
333
334        match serde_json::to_value(fallback_dto) {
335            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
336            _ => HashMap::new(),
337        }
338    }
339
340    /// Initialize the source with runtime context.
341    ///
342    /// This method is called automatically by DrasiLib's `add_source()` method.
343    /// Plugin developers do not need to call this directly.
344    ///
345    /// The context provides access to:
346    /// - `source_id`: The source's unique identifier
347    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
348    /// - `state_store`: Optional persistent state storage
349    pub async fn initialize(&self, context: SourceRuntimeContext) {
350        // Store context for later use
351        *self.context.write().await = Some(context.clone());
352
353        // Wire the status handle to the graph update channel
354        self.status_handle.wire(context.update_tx.clone()).await;
355
356        // Store state_store from context only if not already set programmatically
357        if let Some(state_store) = context.state_store.as_ref() {
358            let mut guard = self.state_store.write().await;
359            if guard.is_none() {
360                *guard = Some(state_store.clone());
361            }
362        }
363
364        // Store identity provider from context if not already set programmatically
365        if let Some(ip) = context.identity_provider.as_ref() {
366            let mut guard = self.identity_provider.write().await;
367            if guard.is_none() {
368                *guard = Some(ip.clone());
369            }
370        }
371    }
372
373    /// Get the runtime context if initialized.
374    ///
375    /// Returns `None` if `initialize()` has not been called yet.
376    pub async fn context(&self) -> Option<SourceRuntimeContext> {
377        self.context.read().await.clone()
378    }
379
380    /// Get the state store if configured.
381    ///
382    /// Returns `None` if no state store was provided in the context.
383    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
384        self.state_store.read().await.clone()
385    }
386
387    /// Get the identity provider if set.
388    ///
389    /// Returns the identity provider set either programmatically via
390    /// `set_identity_provider()` or from the runtime context during `initialize()`.
391    /// Programmatically-set providers take precedence over context providers.
392    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
393        self.identity_provider.read().await.clone()
394    }
395
396    /// Set the identity provider programmatically.
397    ///
398    /// This is typically called during source construction when the provider
399    /// is available from configuration (e.g., `with_identity_provider()` builder).
400    /// Providers set this way take precedence over context-injected providers.
401    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
402        *self.identity_provider.write().await = Some(provider);
403    }
404
405    /// Create and register a position handle for `query_id`, initialized to `u64::MAX`.
406    ///
407    /// Returns the shared handle; the same `Arc` is placed in
408    /// `SubscriptionResponse::position_handle` so the query and the source share
409    /// one atomic. If a handle already exists for `query_id` (re-subscribe after
410    /// transient disconnect), the existing handle is returned to preserve any
411    /// position the query had previously reported.
412    pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
413        let mut handles = self.position_handles.write().await;
414        if let Some(existing) = handles.get(query_id) {
415            return existing.clone();
416        }
417        let handle = Arc::new(AtomicU64::new(u64::MAX));
418        handles.insert(query_id.to_string(), handle.clone());
419        handle
420    }
421
422    /// Remove the position handle for `query_id`. No-op if absent.
423    ///
424    /// Called from explicit cleanup paths (`stop_query`/`delete_query` will be
425    /// wired in a follow-up issue). Until then, `cleanup_stale_handles()`
426    /// (invoked inside `compute_confirmed_position`) catches dropped subscribers.
427    pub async fn remove_position_handle(&self, query_id: &str) {
428        let mut handles = self.position_handles.write().await;
429        handles.remove(query_id);
430    }
431
432    /// Compute the minimum confirmed position across all live subscribers.
433    ///
434    /// Returns `None` if no handles are registered, or if every registered
435    /// handle is still `u64::MAX` (no subscriber has confirmed a position yet —
436    /// typically because they are still bootstrapping). Otherwise returns the
437    /// minimum non-`u64::MAX` value, suitable for advancing the source's
438    /// upstream cursor (Postgres `flush_lsn`, Kafka commit, transient WAL prune
439    /// threshold).
440    ///
441    /// Piggy-backs `cleanup_stale_handles()` so dropped subscribers do not pin
442    /// the watermark indefinitely.
443    pub async fn compute_confirmed_position(&self) -> Option<u64> {
444        self.cleanup_stale_handles().await;
445        let handles = self.position_handles.read().await;
446        let mut min: Option<u64> = None;
447        for handle in handles.values() {
448            let v = handle.load(Ordering::Relaxed);
449            if v == u64::MAX {
450                continue;
451            }
452            min = Some(min.map_or(v, |m| m.min(v)));
453        }
454        min
455    }
456
457    /// Drop entries whose `Arc::strong_count == 1` (only `SourceBase` holds a
458    /// reference).
459    ///
460    /// This indicates the subscriber dropped its `SubscriptionResponse` without
461    /// calling `remove_position_handle` — common during query teardown until
462    /// explicit cleanup is wired by the query manager.
463    ///
464    /// Safety constraint: this relies on `SourceBase` being the only long-lived
465    /// holder of the `Arc` besides the subscribing query. If a future periodic
466    /// scan task (or any other component) clones the handle, this method must
467    /// be revisited or replaced with explicit liveness tracking.
468    pub async fn cleanup_stale_handles(&self) {
469        let mut handles = self.position_handles.write().await;
470        handles.retain(|_, handle| Arc::strong_count(handle) > 1);
471    }
472
473    /// Translate the confirmed framework sequence into the corresponding
474    /// source-native position (e.g. Postgres WAL LSN, Kafka offset).
475    ///
476    /// Returns `None` when no confirmed position exists (no subscribers, all
477    /// at `u64::MAX`, or the sequence map has been pruned past the confirmed
478    /// point).
479    ///
480    /// This does **not** prune the internal map — call
481    /// [`prune_position_map()`](Self::prune_position_map) after the source
482    /// has successfully acknowledged the position to its upstream.
483    pub async fn compute_confirmed_source_position(&self) -> Option<Bytes> {
484        let confirmed_seq = self.compute_confirmed_position().await?;
485        let map = self.sequence_position_map.read().await;
486        // Find the entry with the largest sequence ≤ confirmed_seq.
487        map.range(..=confirmed_seq)
488            .next_back()
489            .map(|(_, pos)| pos.clone())
490    }
491
492    /// Prune sequence→position entries that are no longer needed.
493    ///
494    /// Removes all entries with sequence ≤ `up_to_seq`. Call this after the
495    /// source has successfully sent feedback/committed the confirmed position
496    /// to its upstream, so re-send on failure is still possible.
497    pub async fn prune_position_map(&self, up_to_seq: u64) {
498        let mut map = self.sequence_position_map.write().await;
499        // BTreeMap::split_off returns entries >= key; we keep those.
500        let keep = map.split_off(&(up_to_seq.saturating_add(1)));
501        *map = keep;
502    }
503
504    /// Reset the sequence counter, typically after recovering from a checkpoint.
505    /// The next dispatched event will receive `sequence + 1`.
506    pub fn set_next_sequence(&self, sequence: u64) {
507        self.next_sequence
508            .store(sequence.saturating_add(1), Ordering::Relaxed);
509    }
510
511    /// Apply subscription settings that affect the source base.
512    ///
513    /// Should be called at the start of `Source::subscribe()` implementations.
514    /// Handles:
515    /// - Recovering the sequence counter from `last_sequence` to maintain monotonicity
516    ///   across restarts.
517    pub fn apply_subscription_settings(
518        &self,
519        settings: &crate::config::SourceSubscriptionSettings,
520    ) {
521        if let Some(last_seq) = settings.last_sequence {
522            // Atomically advance to last_seq+1, never go backwards.
523            // fetch_max is safe under concurrent subscriptions.
524            let next = last_seq.saturating_add(1);
525            let prev = self.next_sequence.fetch_max(next, Ordering::Relaxed);
526            if next > prev {
527                info!(
528                    "[{}] Sequence counter recovered to {} (from checkpoint last_sequence={})",
529                    self.id, next, last_seq
530                );
531            }
532        }
533    }
534
535    /// Returns a cloneable [`ComponentStatusHandle`] for use in spawned tasks.
536    ///
537    /// The handle can both read and write the component's status and automatically
538    /// notifies the graph on every status change (after `initialize()`).
539    pub fn status_handle(&self) -> ComponentStatusHandle {
540        self.status_handle.clone()
541    }
542
543    /// Clone the SourceBase with shared Arc references
544    ///
545    /// This creates a new SourceBase that shares the same underlying
546    /// data through Arc references. Useful for passing to spawned tasks.
547    pub fn clone_shared(&self) -> Self {
548        Self {
549            id: self.id.clone(),
550            dispatch_mode: self.dispatch_mode,
551            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
552            auto_start: self.auto_start,
553            status_handle: self.status_handle.clone(),
554            dispatchers: self.dispatchers.clone(),
555            context: self.context.clone(),
556            state_store: self.state_store.clone(),
557            task_handle: self.task_handle.clone(),
558            shutdown_tx: self.shutdown_tx.clone(),
559            bootstrap_provider: self.bootstrap_provider.clone(),
560            identity_provider: self.identity_provider.clone(),
561            position_handles: self.position_handles.clone(),
562            next_sequence: self.next_sequence.clone(),
563            raw_config: self.raw_config.clone(),
564            subscriber_notify: self.subscriber_notify.clone(),
565            subscriber_resume_positions: self.subscriber_resume_positions.clone(),
566            position_comparator: self.position_comparator.clone(),
567            sequence_position_map: self.sequence_position_map.clone(),
568        }
569    }
570
571    /// Set the bootstrap provider for this source, taking ownership.
572    ///
573    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
574    /// The bootstrap provider is created by the plugin using its own configuration.
575    ///
576    /// # Example
577    /// ```ignore
578    /// let provider = MyBootstrapProvider::new(config);
579    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
580    /// ```
581    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
582        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
583    }
584
585    /// Set the position comparator for per-subscriber replay filtering.
586    ///
587    /// Sources that support replay should call this during construction to
588    /// enable per-subscriber position gating. Without a comparator, all events
589    /// are delivered to all subscribers regardless of their `resume_from` position.
590    pub async fn set_position_comparator(&self, comparator: impl PositionComparator + 'static) {
591        *self.position_comparator.write().await = Some(Arc::new(comparator));
592    }
593
594    /// Get the source ID
595    pub fn get_id(&self) -> &str {
596        &self.id
597    }
598
599    /// Create a streaming receiver for a query subscription
600    ///
601    /// This creates the appropriate receiver based on the configured dispatch mode:
602    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
603    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
604    ///
605    /// This is a helper method that can be used by sources with custom subscribe logic.
606    pub async fn create_streaming_receiver(
607        &self,
608    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
609        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
610            DispatchMode::Broadcast => {
611                // For broadcast mode, use the single dispatcher
612                let dispatchers = self.dispatchers.read().await;
613                if let Some(dispatcher) = dispatchers.first() {
614                    dispatcher.create_receiver().await?
615                } else {
616                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
617                }
618            }
619            DispatchMode::Channel => {
620                // For channel mode, create a new dispatcher for this subscription
621                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
622                    self.dispatch_buffer_capacity,
623                );
624                let receiver = dispatcher.create_receiver().await?;
625
626                // Add the new dispatcher to our list
627                let mut dispatchers = self.dispatchers.write().await;
628                dispatchers.push(Box::new(dispatcher));
629
630                receiver
631            }
632        };
633
634        // Wake any task blocked in wait_for_subscribers().
635        // Use notify_one() which stores a permit even if no one is waiting yet,
636        // avoiding a race between the dispatchers check and the await.
637        self.subscriber_notify.notify_one();
638
639        Ok(receiver)
640    }
641
642    /// Wait until at least one subscriber has registered.
643    ///
644    /// Sources that start a background polling loop (e.g. CDC) should call
645    /// this before entering their poll loop on a restart cycle. Without this,
646    /// events dispatched before `subscribe()` creates a new dispatcher would
647    /// be silently dropped, advancing the checkpoint past changes that no
648    /// subscriber ever received.
649    ///
650    /// Returns immediately if at least one dispatcher already exists (fresh
651    /// start with bootstrap, or broadcast mode which always has one).
652    pub async fn wait_for_subscribers(&self) {
653        loop {
654            let dispatchers = self.dispatchers.read().await;
655            if !dispatchers.is_empty() {
656                return;
657            }
658            drop(dispatchers);
659            self.subscriber_notify.notified().await;
660        }
661    }
662
663    /// Subscribe to this source with optional bootstrap
664    ///
665    /// This is the standard subscribe implementation that all sources can use.
666    /// It handles:
667    /// - Creating a receiver for streaming events (based on dispatch mode)
668    /// - Setting up bootstrap if requested and a provider has been set
669    /// - Returning the appropriate SubscriptionResponse
670    pub async fn subscribe_with_bootstrap(
671        &self,
672        settings: &crate::config::SourceSubscriptionSettings,
673        source_type: &str,
674    ) -> Result<SubscriptionResponse> {
675        // Recover sequence counter from checkpoint before anything else
676        self.apply_subscription_settings(settings);
677
678        self.subscribe_with_bootstrap_context(settings, source_type, HashMap::new())
679            .await
680    }
681
682    /// Subscribe to this source with optional bootstrap context properties.
683    pub async fn subscribe_with_bootstrap_context(
684        &self,
685        settings: &crate::config::SourceSubscriptionSettings,
686        source_type: &str,
687        bootstrap_properties: HashMap<String, serde_json::Value>,
688    ) -> Result<SubscriptionResponse> {
689        info!(
690            "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
691            settings.query_id,
692            source_type,
693            self.id,
694            settings.enable_bootstrap,
695            settings.resume_from,
696            settings.request_position_handle
697        );
698
699        // Create streaming receiver using helper method
700        let receiver = self.create_streaming_receiver().await?;
701
702        // Register per-subscriber position filter for replay dedup.
703        // In Channel mode, the new dispatcher is the last entry in the vec.
704        // In Broadcast mode, per-subscriber filtering is not supported.
705        if self.dispatch_mode == DispatchMode::Channel {
706            if let Some(ref resume_pos) = settings.resume_from {
707                let dispatchers = self.dispatchers.read().await;
708                let dispatcher_idx = dispatchers.len().saturating_sub(1);
709                drop(dispatchers);
710                self.subscriber_resume_positions
711                    .write()
712                    .await
713                    .insert(dispatcher_idx, resume_pos.clone());
714                debug!(
715                    "[{}] Registered resume position filter for subscriber '{}' at dispatcher index {}",
716                    self.id, settings.query_id, dispatcher_idx
717                );
718            }
719        }
720
721        let query_id_for_response = settings.query_id.clone();
722
723        // resume_from overrides bootstrap: a resuming query already has base
724        // state in its persistent index and just needs replay from the
725        // requested sequence. Re-bootstrapping would corrupt that state.
726        let (bootstrap_receiver, bootstrap_result_receiver) = if settings.resume_from.is_some() {
727            info!(
728                "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
729                settings.query_id, settings.resume_from, source_type, self.id
730            );
731            (None, None)
732        } else if settings.enable_bootstrap {
733            match self
734                .handle_bootstrap_subscription(settings, source_type, bootstrap_properties)
735                .await?
736            {
737                Some((event_rx, result_rx)) => (Some(event_rx), Some(result_rx)),
738                None => (None, None),
739            }
740        } else {
741            (None, None)
742        };
743
744        // Only persistent (replay-capable) queries request a handle. Volatile
745        // queries are deliberately excluded from the min-watermark so they
746        // cannot pin upstream advancement.
747        let position_handle = if settings.request_position_handle {
748            let handle = self.create_position_handle(&settings.query_id).await;
749            // Initialize the handle to the query's checkpoint sequence so that
750            // compute_confirmed_position() includes this subscriber from the
751            // start. Without this, a resuming query whose handle stays at
752            // u64::MAX would be invisible to the min-watermark, letting
753            // flush_lsn advance past its checkpoint.
754            if let Some(last_seq) = settings.last_sequence {
755                handle.store(last_seq, Ordering::Release);
756            }
757            Some(handle)
758        } else {
759            None
760        };
761
762        Ok(SubscriptionResponse {
763            query_id: query_id_for_response,
764            source_id: self.id.clone(),
765            receiver,
766            bootstrap_receiver,
767            position_handle,
768            bootstrap_result_receiver,
769        })
770    }
771
772    /// Create only the bootstrap receiver for a subscription.
773    pub async fn create_bootstrap_receiver(
774        &self,
775        settings: &crate::config::SourceSubscriptionSettings,
776        source_type: &str,
777        bootstrap_properties: HashMap<String, serde_json::Value>,
778    ) -> Result<Option<BootstrapEventReceiver>> {
779        Ok(self
780            .handle_bootstrap_subscription(settings, source_type, bootstrap_properties)
781            .await?
782            .map(|(bootstrap_receiver, _)| bootstrap_receiver))
783    }
784
785    /// Handle bootstrap subscription logic.
786    ///
787    /// Returns the bootstrap event receiver and a oneshot receiver for the
788    /// `BootstrapResult` handover metadata.
789    async fn handle_bootstrap_subscription(
790        &self,
791        settings: &crate::config::SourceSubscriptionSettings,
792        source_type: &str,
793        bootstrap_properties: HashMap<String, serde_json::Value>,
794    ) -> Result<
795        Option<(
796            BootstrapEventReceiver,
797            tokio::sync::oneshot::Receiver<anyhow::Result<BootstrapResult>>,
798        )>,
799    > {
800        let provider_guard = self.bootstrap_provider.read().await;
801        if let Some(provider) = provider_guard.clone() {
802            drop(provider_guard); // Release lock before spawning task
803
804            info!(
805                "Creating bootstrap for query '{}' on {} source '{}'",
806                settings.query_id, source_type, self.id
807            );
808
809            // Create bootstrap context
810            let context = if bootstrap_properties.is_empty() {
811                BootstrapContext::new_minimal(
812                    self.id.clone(), // server_id
813                    self.id.clone(), // source_id
814                )
815            } else {
816                BootstrapContext::with_properties(
817                    self.id.clone(), // server_id
818                    self.id.clone(), // source_id
819                    bootstrap_properties,
820                )
821            };
822
823            // Create bootstrap channel
824            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
825
826            // Create oneshot for BootstrapResult handover metadata
827            let (result_tx, result_rx) = tokio::sync::oneshot::channel();
828
829            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
830            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
831            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
832
833            // Create bootstrap request with request_id
834            let request = BootstrapRequest {
835                query_id: settings.query_id.clone(),
836                node_labels,
837                relation_labels,
838                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
839            };
840
841            // Clone settings for the async task
842            let settings_clone = settings.clone();
843            let source_id = self.id.clone();
844
845            // Get instance_id from context for log routing isolation
846            let instance_id = self
847                .context()
848                .await
849                .map(|c| c.instance_id.clone())
850                .unwrap_or_default();
851
852            // Spawn bootstrap task with tracing span for proper log routing
853            let span = tracing::info_span!(
854                "source_bootstrap",
855                instance_id = %instance_id,
856                component_id = %source_id,
857                component_type = "source"
858            );
859            tokio::spawn(
860                async move {
861                    let outcome = provider
862                        .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
863                        .await;
864
865                    match &outcome {
866                        Ok(result) => {
867                            info!(
868                                "Bootstrap completed successfully for query '{}', sent {} events \
869                                 (last_sequence={:?}, sequences_aligned={})",
870                                settings_clone.query_id,
871                                result.event_count,
872                                result.last_sequence,
873                                result.sequences_aligned
874                            );
875                        }
876                        Err(e) => {
877                            error!(
878                                "Bootstrap failed for query '{}': {e}",
879                                settings_clone.query_id
880                            );
881                        }
882                    }
883
884                    // Send the result (or error) to the query manager for handover.
885                    // If the receiver was dropped (query stopped), this is a no-op.
886                    let _ = result_tx.send(outcome);
887                }
888                .instrument(span),
889            );
890
891            Ok(Some((bootstrap_rx, result_rx)))
892        } else {
893            info!(
894                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
895                settings.query_id, source_type, self.id
896            );
897            Ok(None)
898        }
899    }
900
901    /// Dispatch a SourceChange event with profiling metadata
902    ///
903    /// This method handles the common pattern of:
904    /// - Creating profiling metadata with timestamp
905    /// - Wrapping the change in a SourceEventWrapper
906    /// - Dispatching to all subscribers
907    /// - Handling the no-subscriber case gracefully
908    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
909        // Create profiling metadata
910        let mut profiling = profiling::ProfilingMetadata::new();
911        profiling.source_send_ns = Some(profiling::timestamp_ns());
912
913        // Create event wrapper
914        let wrapper = SourceEventWrapper::with_profiling(
915            self.id.clone(),
916            SourceEvent::Change(change),
917            chrono::Utc::now(),
918            profiling,
919        );
920
921        // Dispatch event
922        self.dispatch_event(wrapper).await
923    }
924
925    /// Maximum allowed size for source position bytes (64KB).
926    /// Positions exceeding this limit are skipped at checkpoint time
927    /// to prevent memory issues, preserving the last good position.
928    pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536;
929
930    /// Dispatch a SourceEventWrapper to all subscribers.
931    ///
932    /// This is a generic method for dispatching any SourceEvent.
933    /// It handles Arc-wrapping for zero-copy sharing and logs
934    /// when there are no subscribers.
935    /// The framework stamps every event with a monotonic sequence number.
936    pub async fn dispatch_event(&self, mut wrapper: SourceEventWrapper) -> Result<()> {
937        // Warn about oversized source positions; the checkpoint layer will
938        // enforce the limit and preserve the last good position.
939        if let Some(ref pos) = wrapper.source_position {
940            if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
941                warn!(
942                    "[{}] Source position is large ({} bytes > {} limit); \
943                     checkpoint staging will preserve the previous good position",
944                    self.id,
945                    pos.len(),
946                    Self::MAX_SOURCE_POSITION_BYTES
947                );
948            }
949        }
950
951        // Framework assigns the monotonic sequence
952        wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
953
954        // Record sequence→source_position mapping for confirmed-position lookups.
955        if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
956            self.sequence_position_map
957                .write()
958                .await
959                .insert(seq, pos.clone());
960        }
961
962        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
963
964        // Arc-wrap for zero-copy sharing across dispatchers
965        let arc_wrapper = Arc::new(wrapper);
966
967        // Send to all dispatchers, filtering by per-subscriber resume position
968        let dispatchers = self.dispatchers.read().await;
969        let comparator = self.position_comparator.read().await;
970        let mut cleared_indices: Vec<usize> = Vec::new();
971        // Collect (dispatcher_index, new_high_water) updates for after dispatch.
972        let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
973
974        for (idx, dispatcher) in dispatchers.iter().enumerate() {
975            // Check per-subscriber position high-water mark.
976            // Events at or before the subscriber's high-water mark are
977            // suppressed.  When the event passes the mark, we deliver it
978            // and advance the mark (instead of removing it) so that future
979            // rewinds (caused by later subscribers triggering a stream
980            // restart) are still caught.
981            if let Some(ref cmp) = *comparator {
982                let resume_positions = self.subscriber_resume_positions.read().await;
983                if let Some(resume_pos) = resume_positions.get(&idx) {
984                    if let Some(ref event_pos) = arc_wrapper.source_position {
985                        if !cmp.position_reached(event_pos, resume_pos) {
986                            // Event hasn't passed high-water mark — suppress
987                            continue;
988                        }
989                        // Position reached — will update high-water after dispatch
990                        cleared_indices.push(idx);
991                    }
992                    // No source_position on event — cannot filter, deliver it
993                }
994            }
995
996            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
997                debug!("[{}] Failed to dispatch event: {}", self.id, e);
998            } else if let Some(ref event_pos) = arc_wrapper.source_position {
999                // Track high-water mark so future rewinds are caught
1000                hwm_updates.push((idx, event_pos.clone()));
1001            }
1002        }
1003        drop(comparator);
1004        drop(dispatchers);
1005
1006        // Advance high-water marks for dispatchers that received this event.
1007        // For dispatchers that already had an entry (cleared_indices), this
1008        // updates their mark.  For dispatchers that had no entry yet, this
1009        // establishes one — protecting them from future rewinds.
1010        if !hwm_updates.is_empty() {
1011            let mut resume_positions = self.subscriber_resume_positions.write().await;
1012            for (idx, pos) in hwm_updates {
1013                resume_positions.insert(idx, pos);
1014            }
1015        }
1016
1017        Ok(())
1018    }
1019
1020    /// Dispatch a batch of events, acquiring the dispatchers lock once for
1021    /// the entire batch. This is more efficient than calling
1022    /// [`dispatch_event()`](Self::dispatch_event) per-event when the source
1023    /// processes multiple rows per poll cycle.
1024    pub async fn dispatch_events_batch(&self, events: Vec<SourceEventWrapper>) -> Result<()> {
1025        if events.is_empty() {
1026            return Ok(());
1027        }
1028
1029        let dispatchers = self.dispatchers.read().await;
1030        let comparator = self.position_comparator.read().await;
1031
1032        for mut wrapper in events {
1033            if let Some(ref pos) = wrapper.source_position {
1034                if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
1035                    warn!(
1036                        "[{}] Source position is large ({} bytes > {} limit); \
1037                         checkpoint staging will preserve the previous good position",
1038                        self.id,
1039                        pos.len(),
1040                        Self::MAX_SOURCE_POSITION_BYTES
1041                    );
1042                }
1043            }
1044
1045            wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
1046
1047            // Record sequence→source_position mapping for confirmed-position lookups.
1048            if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
1049                self.sequence_position_map
1050                    .write()
1051                    .await
1052                    .insert(seq, pos.clone());
1053            }
1054
1055            debug!("[{}] Dispatching event (batch): {:?}", self.id, &wrapper);
1056
1057            let arc_wrapper = Arc::new(wrapper);
1058            let mut cleared_indices: Vec<usize> = Vec::new();
1059            let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
1060
1061            for (idx, dispatcher) in dispatchers.iter().enumerate() {
1062                // Check per-subscriber position high-water mark
1063                if let Some(ref cmp) = *comparator {
1064                    let resume_positions = self.subscriber_resume_positions.read().await;
1065                    if let Some(resume_pos) = resume_positions.get(&idx) {
1066                        if let Some(ref event_pos) = arc_wrapper.source_position {
1067                            if !cmp.position_reached(event_pos, resume_pos) {
1068                                debug!(
1069                                    "[{}] Position filter: SKIPPING event for dispatcher {} \
1070                                     (event_pos={:?} <= resume_pos={:?})",
1071                                    self.id,
1072                                    idx,
1073                                    event_pos.as_ref(),
1074                                    resume_pos.as_ref()
1075                                );
1076                                continue;
1077                            }
1078                            debug!(
1079                                "[{}] Position filter: PASSING event for dispatcher {} \
1080                                 (event_pos={:?} > resume_pos={:?})",
1081                                self.id,
1082                                idx,
1083                                event_pos.as_ref(),
1084                                resume_pos.as_ref()
1085                            );
1086                            cleared_indices.push(idx);
1087                        }
1088                    } else {
1089                        debug!(
1090                            "[{}] Position filter: NO resume position for dispatcher {}, passing through",
1091                            self.id, idx
1092                        );
1093                    }
1094                }
1095
1096                if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1097                    debug!("[{}] Failed to dispatch event: {}", self.id, e);
1098                } else if let Some(ref event_pos) = arc_wrapper.source_position {
1099                    hwm_updates.push((idx, event_pos.clone()));
1100                }
1101            }
1102
1103            // Advance high-water marks for dispatchers that received this event
1104            if !hwm_updates.is_empty() {
1105                let mut resume_positions = self.subscriber_resume_positions.write().await;
1106                for (idx, pos) in hwm_updates {
1107                    resume_positions.insert(idx, pos);
1108                }
1109            }
1110        }
1111
1112        drop(comparator);
1113        drop(dispatchers);
1114
1115        Ok(())
1116    }
1117
1118    /// Broadcast SourceControl events
1119    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
1120        let wrapper = SourceEventWrapper::new(
1121            self.id.clone(),
1122            SourceEvent::Control(control),
1123            chrono::Utc::now(),
1124        );
1125        self.dispatch_event(wrapper).await
1126    }
1127
1128    /// Create a test subscription to this source (synchronous, fallible)
1129    ///
1130    /// This method is intended for use in tests to receive events from the source.
1131    /// It properly handles both Broadcast and Channel dispatch modes by delegating
1132    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
1133    ///
1134    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
1135    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
1136    ///
1137    /// # Returns
1138    /// A receiver that will receive all events dispatched by this source,
1139    /// or an error if the receiver cannot be created.
1140    pub fn try_test_subscribe(
1141        &self,
1142    ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
1143        tokio::task::block_in_place(|| {
1144            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
1145        })
1146    }
1147
1148    /// Create a test subscription to this source (synchronous wrapper)
1149    ///
1150    /// Convenience wrapper around [`try_test_subscribe`](Self::try_test_subscribe)
1151    /// that panics on failure. Prefer `try_test_subscribe()` in new code.
1152    ///
1153    /// # Panics
1154    /// Panics if the receiver cannot be created.
1155    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
1156        self.try_test_subscribe()
1157            .expect("Failed to create test subscription receiver")
1158    }
1159
1160    /// Helper function to dispatch events from spawned tasks (unstamped).
1161    ///
1162    /// This is a static helper that can be used from spawned async tasks that don't
1163    /// have access to `self`. It manually iterates through dispatchers and sends the event.
1164    ///
1165    /// **Important**: This method does NOT stamp a monotonic sequence number and
1166    /// does NOT validate `source_position` size. Events dispatched through this
1167    /// method will not be checkpoint-tracked. This is acceptable for sources that
1168    /// do not support replay (`supports_replay() == false`).
1169    ///
1170    /// # For recoverable/checkpointed sources
1171    ///
1172    /// Use [`clone_shared()`](Self::clone_shared) to obtain a `SourceBase` that
1173    /// can be moved into spawned tasks, then call [`dispatch_event()`](Self::dispatch_event)
1174    /// which stamps sequences and validates positions:
1175    ///
1176    /// ```ignore
1177    /// let base = self.base.clone_shared();
1178    /// tokio::spawn(async move {
1179    ///     base.dispatch_event(wrapper).await.ok();
1180    /// });
1181    /// ```
1182    ///
1183    /// # Arguments
1184    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
1185    /// * `wrapper` - The event wrapper to dispatch
1186    /// * `source_id` - Source ID for logging
1187    pub async fn dispatch_from_task(
1188        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
1189        wrapper: SourceEventWrapper,
1190        source_id: &str,
1191    ) -> Result<()> {
1192        debug!(
1193            "[{}] Dispatching event from task: {:?}",
1194            source_id, &wrapper
1195        );
1196
1197        // Arc-wrap for zero-copy sharing across dispatchers
1198        let arc_wrapper = Arc::new(wrapper);
1199
1200        // Send to all dispatchers
1201        let dispatchers_guard = dispatchers.read().await;
1202        for dispatcher in dispatchers_guard.iter() {
1203            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1204                debug!("[{source_id}] Failed to dispatch event from task: {e}");
1205            }
1206        }
1207
1208        Ok(())
1209    }
1210
1211    /// Handle common stop functionality
1212    pub async fn stop_common(&self) -> Result<()> {
1213        info!("Stopping source '{}'", self.id);
1214
1215        // Send shutdown signal if we have one
1216        if let Some(tx) = self.shutdown_tx.write().await.take() {
1217            let _ = tx.send(());
1218        }
1219
1220        // Wait for task to complete
1221        if let Some(mut handle) = self.task_handle.write().await.take() {
1222            match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
1223                Ok(Ok(())) => {
1224                    info!("Source '{}' task completed successfully", self.id);
1225                }
1226                Ok(Err(e)) => {
1227                    error!("Source '{}' task panicked: {}", self.id, e);
1228                }
1229                Err(_) => {
1230                    warn!(
1231                        "Source '{}' task did not complete within timeout, aborting",
1232                        self.id
1233                    );
1234                    handle.abort();
1235                }
1236            }
1237        }
1238
1239        // Clear stale dispatchers so that a subsequent start()+subscribe()
1240        // cycle does not race: without this, the CDC polling loop could
1241        // dispatch events to the old (dead) channel receivers before the
1242        // new subscribe() call creates fresh dispatchers, silently dropping
1243        // events while still advancing the checkpoint LSN.
1244        //
1245        // Broadcast mode keeps a single persistent dispatcher that hands
1246        // out receivers; channel mode creates one dispatcher per subscriber.
1247        if self.dispatch_mode == DispatchMode::Channel {
1248            let mut dispatchers = self.dispatchers.write().await;
1249            dispatchers.clear();
1250        }
1251
1252        self.set_status(
1253            ComponentStatus::Stopped,
1254            Some(format!("Source '{}' stopped", self.id)),
1255        )
1256        .await;
1257        info!("Source '{}' stopped", self.id);
1258        Ok(())
1259    }
1260
1261    /// Clear stale dispatchers from a prior lifecycle.
1262    ///
1263    /// Sources that manage their own stop/start lifecycle (instead of using
1264    /// `stop_common()`) **must** call this at the end of their `stop()`
1265    /// implementation. Without this, a subsequent `start()` + `subscribe()`
1266    /// cycle can race: the polling loop dispatches events to the old (dead)
1267    /// channel receivers before `subscribe()` creates fresh dispatchers,
1268    /// silently dropping events while still advancing the checkpoint.
1269    ///
1270    /// Only channel-mode dispatchers are cleared — broadcast mode keeps a
1271    /// single persistent dispatcher.
1272    pub async fn clear_dispatchers(&self) {
1273        if self.dispatch_mode == DispatchMode::Channel {
1274            let mut dispatchers = self.dispatchers.write().await;
1275            dispatchers.clear();
1276        }
1277    }
1278
1279    /// Clear the source's state store partition.
1280    ///
1281    /// This is called during deprovision to remove all persisted state
1282    /// associated with this source. Sources that override `deprovision()`
1283    /// can call this to clean up their state store.
1284    pub async fn deprovision_common(&self) -> Result<()> {
1285        info!("Deprovisioning source '{}'", self.id);
1286        if let Some(store) = self.state_store().await {
1287            let count = store.clear_store(&self.id).await.map_err(|e| {
1288                anyhow::anyhow!(
1289                    "Failed to clear state store for source '{}': {}",
1290                    self.id,
1291                    e
1292                )
1293            })?;
1294            info!(
1295                "Cleared {} keys from state store for source '{}'",
1296                count, self.id
1297            );
1298        }
1299        Ok(())
1300    }
1301
1302    /// Get the current status.
1303    pub async fn get_status(&self) -> ComponentStatus {
1304        self.status_handle.get_status().await
1305    }
1306
1307    /// Set the component's status — updates local state AND notifies the graph.
1308    ///
1309    /// This is the single canonical way to change a source's status.
1310    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
1311        self.status_handle.set_status(status, message).await;
1312    }
1313
1314    /// Set the task handle
1315    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
1316        *self.task_handle.write().await = Some(handle);
1317    }
1318
1319    /// Set the shutdown sender
1320    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
1321        *self.shutdown_tx.write().await = Some(tx);
1322    }
1323}
1324
1325#[cfg(test)]
1326mod tests {
1327    use super::*;
1328    use crate::sources::ByteLexPositionComparator;
1329
1330    // =========================================================================
1331    // SourceBaseParams tests
1332    // =========================================================================
1333
1334    #[test]
1335    fn test_params_new_defaults() {
1336        let params = SourceBaseParams::new("test-source");
1337        assert_eq!(params.id, "test-source");
1338        assert!(params.dispatch_mode.is_none());
1339        assert!(params.dispatch_buffer_capacity.is_none());
1340        assert!(params.bootstrap_provider.is_none());
1341        assert!(params.auto_start);
1342    }
1343
1344    #[test]
1345    fn test_params_with_dispatch_mode() {
1346        let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
1347        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1348    }
1349
1350    #[test]
1351    fn test_params_with_dispatch_buffer_capacity() {
1352        let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
1353        assert_eq!(params.dispatch_buffer_capacity, Some(50000));
1354    }
1355
1356    #[test]
1357    fn test_params_with_auto_start_false() {
1358        let params = SourceBaseParams::new("s1").with_auto_start(false);
1359        assert!(!params.auto_start);
1360    }
1361
1362    #[test]
1363    fn test_params_builder_chaining() {
1364        let params = SourceBaseParams::new("chained")
1365            .with_dispatch_mode(DispatchMode::Broadcast)
1366            .with_dispatch_buffer_capacity(2000)
1367            .with_auto_start(false);
1368
1369        assert_eq!(params.id, "chained");
1370        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1371        assert_eq!(params.dispatch_buffer_capacity, Some(2000));
1372        assert!(!params.auto_start);
1373    }
1374
1375    // =========================================================================
1376    // SourceBase tests
1377    // =========================================================================
1378
1379    #[tokio::test]
1380    async fn test_new_defaults() {
1381        let params = SourceBaseParams::new("my-source");
1382        let base = SourceBase::new(params).unwrap();
1383
1384        assert_eq!(base.id, "my-source");
1385        assert!(base.auto_start);
1386        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1387    }
1388
1389    #[tokio::test]
1390    async fn test_get_id() {
1391        let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
1392        assert_eq!(base.get_id(), "id-check");
1393    }
1394
1395    #[tokio::test]
1396    async fn test_get_auto_start() {
1397        let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
1398        assert!(base_default.get_auto_start());
1399
1400        let base_false =
1401            SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
1402        assert!(!base_false.get_auto_start());
1403    }
1404
1405    #[tokio::test]
1406    async fn test_get_status_initial() {
1407        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1408        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1409    }
1410
1411    #[tokio::test]
1412    async fn test_set_status() {
1413        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1414
1415        base.set_status(ComponentStatus::Running, None).await;
1416        assert_eq!(base.get_status().await, ComponentStatus::Running);
1417
1418        base.set_status(ComponentStatus::Error, Some("oops".into()))
1419            .await;
1420        assert_eq!(base.get_status().await, ComponentStatus::Error);
1421    }
1422
1423    #[tokio::test]
1424    async fn test_status_handle_returns_handle() {
1425        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1426        let handle = base.status_handle();
1427
1428        // The handle should reflect the same status as the base
1429        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
1430
1431        // Mutating through the handle should be visible via SourceBase
1432        handle.set_status(ComponentStatus::Starting, None).await;
1433        assert_eq!(base.get_status().await, ComponentStatus::Starting);
1434    }
1435
1436    // =========================================================================
1437    // Position handle and resume_from tests (issue #366)
1438    // =========================================================================
1439
1440    use crate::bootstrap::{
1441        BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
1442    };
1443    use crate::channels::BootstrapEventSender;
1444    use async_trait::async_trait;
1445
1446    fn make_settings(
1447        query_id: &str,
1448        enable_bootstrap: bool,
1449        resume_from: Option<bytes::Bytes>,
1450        request_position_handle: bool,
1451    ) -> crate::config::SourceSubscriptionSettings {
1452        use std::collections::HashSet;
1453        crate::config::SourceSubscriptionSettings {
1454            source_id: "test-src".to_string(),
1455            enable_bootstrap,
1456            query_id: query_id.to_string(),
1457            nodes: HashSet::new(),
1458            relations: HashSet::new(),
1459            resume_from,
1460            request_position_handle,
1461            last_sequence: None,
1462        }
1463    }
1464
1465    /// Minimal bootstrap provider that closes its sender immediately.
1466    /// Enough to verify a `bootstrap_receiver` was created without sending data.
1467    struct NoopProvider;
1468
1469    #[async_trait]
1470    impl BootstrapProvider for NoopProvider {
1471        async fn bootstrap(
1472            &self,
1473            _request: BootstrapRequest,
1474            _context: &BootstrapContext,
1475            _event_tx: BootstrapEventSender,
1476            _settings: Option<&crate::config::SourceSubscriptionSettings>,
1477        ) -> Result<BootstrapResult> {
1478            Ok(BootstrapResult::default())
1479        }
1480    }
1481
1482    fn make_base_with_bootstrap(id: &str) -> SourceBase {
1483        let mut params = SourceBaseParams::new(id);
1484        params.bootstrap_provider = Some(Box::new(NoopProvider));
1485        SourceBase::new(params).unwrap()
1486    }
1487
1488    #[tokio::test]
1489    async fn test_create_position_handle_initializes_to_u64_max() {
1490        let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
1491        let handle = base.create_position_handle("q1").await;
1492        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1493    }
1494
1495    #[tokio::test]
1496    async fn test_create_position_handle_idempotent_for_same_query() {
1497        let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
1498        let h1 = base.create_position_handle("q1").await;
1499        h1.store(123, Ordering::Relaxed);
1500        let h2 = base.create_position_handle("q1").await;
1501        // Same Arc — preserves the previously reported position.
1502        assert!(Arc::ptr_eq(&h1, &h2));
1503        assert_eq!(h2.load(Ordering::Relaxed), 123);
1504    }
1505
1506    #[tokio::test]
1507    async fn test_remove_position_handle_drops_entry() {
1508        let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
1509        let handle = base.create_position_handle("q1").await;
1510        handle.store(42, Ordering::Relaxed);
1511        assert_eq!(base.compute_confirmed_position().await, Some(42));
1512        base.remove_position_handle("q1").await;
1513        assert_eq!(base.compute_confirmed_position().await, None);
1514    }
1515
1516    #[tokio::test]
1517    async fn test_remove_position_handle_noop_when_absent() {
1518        let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
1519        // Must not panic.
1520        base.remove_position_handle("never-registered").await;
1521        assert_eq!(base.compute_confirmed_position().await, None);
1522    }
1523
1524    #[tokio::test]
1525    async fn test_compute_confirmed_position_returns_none_when_empty() {
1526        let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1527        assert_eq!(base.compute_confirmed_position().await, None);
1528    }
1529
1530    #[tokio::test]
1531    async fn test_compute_confirmed_position_returns_none_when_all_max() {
1532        let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1533        let _h1 = base.create_position_handle("q1").await;
1534        let _h2 = base.create_position_handle("q2").await;
1535        assert_eq!(base.compute_confirmed_position().await, None);
1536    }
1537
1538    #[tokio::test]
1539    async fn test_compute_confirmed_position_filters_max_returns_min() {
1540        let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1541        let h1 = base.create_position_handle("q1").await;
1542        let _h2 = base.create_position_handle("q2").await; // stays u64::MAX
1543        let h3 = base.create_position_handle("q3").await;
1544        h1.store(100, Ordering::Relaxed);
1545        h3.store(50, Ordering::Relaxed);
1546        assert_eq!(base.compute_confirmed_position().await, Some(50));
1547    }
1548
1549    #[tokio::test]
1550    async fn test_compute_confirmed_position_single_real_value() {
1551        let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1552        let h1 = base.create_position_handle("q1").await;
1553        let _h2 = base.create_position_handle("q2").await;
1554        h1.store(7, Ordering::Relaxed);
1555        assert_eq!(base.compute_confirmed_position().await, Some(7));
1556    }
1557
1558    #[tokio::test]
1559    async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1560        let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1561        {
1562            let handle = base.create_position_handle("q1").await;
1563            handle.store(99, Ordering::Relaxed);
1564            // handle (the external clone) drops here.
1565        }
1566        base.cleanup_stale_handles().await;
1567        assert_eq!(base.compute_confirmed_position().await, None);
1568    }
1569
1570    #[tokio::test]
1571    async fn test_cleanup_stale_handles_keeps_held_arc() {
1572        let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1573        let handle = base.create_position_handle("q1").await;
1574        handle.store(11, Ordering::Relaxed);
1575        base.cleanup_stale_handles().await;
1576        // External clone still alive, entry retained.
1577        assert_eq!(base.compute_confirmed_position().await, Some(11));
1578        // Keep `handle` alive past the assertion.
1579        drop(handle);
1580    }
1581
1582    #[tokio::test]
1583    async fn test_subscribe_with_request_position_handle_returns_handle() {
1584        let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1585        let response = base
1586            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1587            .await
1588            .unwrap();
1589        let handle = response.position_handle.expect("expected handle");
1590        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1591        // Internal map should also have the entry (so min-watermark sees it).
1592        assert_eq!(base.compute_confirmed_position().await, None); // u64::MAX → None
1593    }
1594
1595    #[tokio::test]
1596    async fn test_subscribe_without_request_position_handle_returns_none() {
1597        let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1598        let response = base
1599            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1600            .await
1601            .unwrap();
1602        assert!(response.position_handle.is_none());
1603        // Internal map must be empty so this volatile query is excluded from
1604        // the min-watermark.
1605        let handles = base.position_handles.read().await;
1606        assert!(handles.is_empty());
1607    }
1608
1609    #[tokio::test]
1610    async fn test_subscribe_returned_handle_shared_with_internal() {
1611        let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1612        let response = base
1613            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1614            .await
1615            .unwrap();
1616        let handle = response.position_handle.unwrap();
1617        handle.store(42, Ordering::Relaxed);
1618        assert_eq!(base.compute_confirmed_position().await, Some(42));
1619    }
1620
1621    #[tokio::test]
1622    async fn test_subscribe_with_resume_from_skips_bootstrap() {
1623        let base = make_base_with_bootstrap("sub-resume");
1624        let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1625        let response = base
1626            .subscribe_with_bootstrap(&make_settings("q1", true, Some(position), false), "test")
1627            .await
1628            .unwrap();
1629        assert!(
1630            response.bootstrap_receiver.is_none(),
1631            "resume_from must override enable_bootstrap"
1632        );
1633    }
1634
1635    #[tokio::test]
1636    async fn test_subscribe_resume_without_bootstrap_still_none() {
1637        let base = make_base_with_bootstrap("sub-resume-no-bs");
1638        let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1639        let response = base
1640            .subscribe_with_bootstrap(&make_settings("q1", false, Some(position), false), "test")
1641            .await
1642            .unwrap();
1643        assert!(response.bootstrap_receiver.is_none());
1644    }
1645
1646    #[tokio::test]
1647    async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1648        let base = make_base_with_bootstrap("sub-bs");
1649        let response = base
1650            .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1651            .await
1652            .unwrap();
1653        assert!(
1654            response.bootstrap_receiver.is_some(),
1655            "regression guard: bootstrap path must still produce a receiver"
1656        );
1657    }
1658
1659    #[tokio::test]
1660    async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1661        let base = make_base_with_bootstrap("sub-neither");
1662        let response = base
1663            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1664            .await
1665            .unwrap();
1666        assert!(response.bootstrap_receiver.is_none());
1667        assert!(response.position_handle.is_none());
1668    }
1669
1670    // =========================================================================
1671    // dispatch_events_batch tests
1672    // =========================================================================
1673
1674    fn make_event(source_id: &str, position: Option<&[u8]>) -> SourceEventWrapper {
1675        let change = drasi_core::models::SourceChange::Insert {
1676            element: drasi_core::models::Element::Node {
1677                metadata: drasi_core::models::ElementMetadata {
1678                    reference: drasi_core::models::ElementReference::new(source_id, "n1"),
1679                    labels: Arc::from([Arc::from("Label")]),
1680                    effective_from: 0,
1681                },
1682                properties: drasi_core::models::ElementPropertyMap::new(),
1683            },
1684        };
1685        let mut wrapper = SourceEventWrapper::new(
1686            source_id.to_string(),
1687            SourceEvent::Change(change),
1688            chrono::Utc::now(),
1689        );
1690        wrapper.source_position = position.map(|p| bytes::Bytes::from(p.to_vec()));
1691        wrapper
1692    }
1693
1694    #[tokio::test]
1695    async fn test_dispatch_events_batch_empty_returns_ok() {
1696        let base = SourceBase::new(SourceBaseParams::new("batch-empty")).unwrap();
1697        let result = base.dispatch_events_batch(Vec::new()).await;
1698        assert!(result.is_ok());
1699    }
1700
1701    #[tokio::test]
1702    async fn test_dispatch_events_batch_stamps_monotonic_sequences() {
1703        let params = SourceBaseParams::new("batch-seq").with_dispatch_mode(DispatchMode::Channel);
1704        let base = SourceBase::new(params).unwrap();
1705
1706        // Create a receiver so events are actually captured
1707        let mut receiver = base.create_streaming_receiver().await.unwrap();
1708
1709        let events = vec![
1710            make_event("batch-seq", Some(b"\x01")),
1711            make_event("batch-seq", Some(b"\x02")),
1712            make_event("batch-seq", Some(b"\x03")),
1713        ];
1714
1715        base.dispatch_events_batch(events).await.unwrap();
1716
1717        let e1 = receiver.recv().await.unwrap();
1718        let e2 = receiver.recv().await.unwrap();
1719        let e3 = receiver.recv().await.unwrap();
1720
1721        let s1 = e1.sequence.expect("event 1 must have sequence");
1722        let s2 = e2.sequence.expect("event 2 must have sequence");
1723        let s3 = e3.sequence.expect("event 3 must have sequence");
1724
1725        assert_eq!(s2, s1 + 1, "sequences must be monotonically increasing");
1726        assert_eq!(s3, s2 + 1, "sequences must be monotonically increasing");
1727    }
1728
1729    #[tokio::test]
1730    async fn test_dispatch_events_batch_multi_dispatcher_fanout() {
1731        let params =
1732            SourceBaseParams::new("batch-fanout").with_dispatch_mode(DispatchMode::Channel);
1733        let base = SourceBase::new(params).unwrap();
1734
1735        // Create two receivers (two dispatchers in channel mode)
1736        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1737        let mut rx2 = base.create_streaming_receiver().await.unwrap();
1738
1739        let events = vec![
1740            make_event("batch-fanout", Some(b"\x01")),
1741            make_event("batch-fanout", Some(b"\x02")),
1742        ];
1743
1744        base.dispatch_events_batch(events).await.unwrap();
1745
1746        // Both receivers should get both events
1747        let r1_e1 = rx1.recv().await.unwrap();
1748        let r1_e2 = rx1.recv().await.unwrap();
1749        let r2_e1 = rx2.recv().await.unwrap();
1750        let r2_e2 = rx2.recv().await.unwrap();
1751
1752        // Same sequences across both receivers
1753        assert_eq!(r1_e1.sequence, r2_e1.sequence);
1754        assert_eq!(r1_e2.sequence, r2_e2.sequence);
1755    }
1756
1757    #[tokio::test]
1758    async fn test_dispatch_events_batch_oversized_position_still_dispatches() {
1759        let params =
1760            SourceBaseParams::new("batch-oversize").with_dispatch_mode(DispatchMode::Channel);
1761        let base = SourceBase::new(params).unwrap();
1762        let mut rx = base.create_streaming_receiver().await.unwrap();
1763
1764        // Create an event with a position larger than MAX_SOURCE_POSITION_BYTES
1765        let big_pos = vec![0xAA; SourceBase::MAX_SOURCE_POSITION_BYTES + 1];
1766        let events = vec![make_event("batch-oversize", Some(&big_pos))];
1767
1768        // Should succeed (warn but not error)
1769        base.dispatch_events_batch(events).await.unwrap();
1770
1771        let received = rx.recv().await.unwrap();
1772        assert!(received.sequence.is_some(), "event must still be stamped");
1773        assert_eq!(
1774            received.source_position.as_ref().map(|p| p.len()),
1775            Some(SourceBase::MAX_SOURCE_POSITION_BYTES + 1),
1776            "oversized position must still be delivered (checkpoint layer enforces the limit)"
1777        );
1778    }
1779
1780    // =========================================================================
1781    // Per-subscriber position filtering tests
1782    // =========================================================================
1783
1784    #[tokio::test]
1785    async fn test_position_filter_suppresses_events_before_resume() {
1786        let params = SourceBaseParams::new("pos-filter").with_dispatch_mode(DispatchMode::Channel);
1787        let base = SourceBase::new(params).unwrap();
1788        base.set_position_comparator(ByteLexPositionComparator)
1789            .await;
1790
1791        // Create two subscribers
1792        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1793        let mut rx2 = base.create_streaming_receiver().await.unwrap();
1794
1795        // rx1 (dispatcher 0): resume_from = position [0x00, 0x05]
1796        // rx2 (dispatcher 1): no resume position (gets everything)
1797        base.subscriber_resume_positions
1798            .write()
1799            .await
1800            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1801
1802        // Dispatch event at position [0x00, 0x03] — before rx1's resume
1803        let event = make_event("pos-filter", Some(&[0x00, 0x03]));
1804        base.dispatch_event(event).await.unwrap();
1805
1806        // rx2 should receive it (no filter)
1807        let r2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1808            .await
1809            .unwrap()
1810            .unwrap();
1811        assert_eq!(r2.source_position.as_ref().unwrap().as_ref(), &[0x00, 0x03]);
1812
1813        // rx1 should NOT receive it (position not reached)
1814        let r1 = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
1815        assert!(r1.is_err(), "rx1 should timeout — event was suppressed");
1816    }
1817
1818    #[tokio::test]
1819    async fn test_position_filter_delivers_events_past_resume() {
1820        let params = SourceBaseParams::new("pos-filter2").with_dispatch_mode(DispatchMode::Channel);
1821        let base = SourceBase::new(params).unwrap();
1822        base.set_position_comparator(ByteLexPositionComparator)
1823            .await;
1824
1825        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1826
1827        // resume_from = [0x00, 0x05]
1828        base.subscriber_resume_positions
1829            .write()
1830            .await
1831            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1832
1833        // Event at [0x00, 0x06] — past resume
1834        let event = make_event("pos-filter2", Some(&[0x00, 0x06]));
1835        base.dispatch_event(event).await.unwrap();
1836
1837        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1838            .await
1839            .unwrap()
1840            .unwrap();
1841        assert_eq!(
1842            received.source_position.as_ref().unwrap().as_ref(),
1843            &[0x00, 0x06]
1844        );
1845    }
1846
1847    #[tokio::test]
1848    async fn test_position_filter_advances_high_water_mark() {
1849        let params = SourceBaseParams::new("pos-hwm").with_dispatch_mode(DispatchMode::Channel);
1850        let base = SourceBase::new(params).unwrap();
1851        base.set_position_comparator(ByteLexPositionComparator)
1852            .await;
1853
1854        let mut rx = base.create_streaming_receiver().await.unwrap();
1855
1856        // resume_from = [0x00, 0x03]
1857        base.subscriber_resume_positions
1858            .write()
1859            .await
1860            .insert(0, Bytes::from_static(&[0x00, 0x03]));
1861
1862        // First event at [0x00, 0x04] — past resume, advances high-water mark
1863        base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x04])))
1864            .await
1865            .unwrap();
1866        let _ = rx.recv().await.unwrap();
1867
1868        // High-water mark should be updated (not cleared)
1869        {
1870            let positions = base.subscriber_resume_positions.read().await;
1871            assert_eq!(
1872                positions.get(&0).map(|b| b.as_ref()),
1873                Some([0x00, 0x04].as_slice()),
1874                "high-water mark should be advanced to dispatched position"
1875            );
1876        }
1877
1878        // Subsequent event at LOWER position should be suppressed (rewind protection)
1879        base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x01])))
1880            .await
1881            .unwrap();
1882        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1883        assert!(
1884            r.is_err(),
1885            "event below high-water mark should be suppressed after rewind"
1886        );
1887
1888        // Event at HIGHER position should flow through
1889        base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x06])))
1890            .await
1891            .unwrap();
1892        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1893            .await
1894            .unwrap()
1895            .unwrap();
1896        assert_eq!(
1897            received.source_position.as_ref().unwrap().as_ref(),
1898            &[0x00, 0x06]
1899        );
1900    }
1901
1902    #[tokio::test]
1903    async fn test_position_filter_equal_position_is_suppressed() {
1904        // resume_from is the LAST committed position, so an event at exactly
1905        // that position has already been processed — it should be suppressed.
1906        let params = SourceBaseParams::new("pos-equal").with_dispatch_mode(DispatchMode::Channel);
1907        let base = SourceBase::new(params).unwrap();
1908        base.set_position_comparator(ByteLexPositionComparator)
1909            .await;
1910
1911        let mut rx = base.create_streaming_receiver().await.unwrap();
1912
1913        base.subscriber_resume_positions
1914            .write()
1915            .await
1916            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1917
1918        // Event at exactly [0x00, 0x05] — should be suppressed
1919        base.dispatch_event(make_event("pos-equal", Some(&[0x00, 0x05])))
1920            .await
1921            .unwrap();
1922
1923        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1924        assert!(
1925            r.is_err(),
1926            "event at exactly resume position should be suppressed"
1927        );
1928    }
1929
1930    #[tokio::test]
1931    async fn test_position_filter_no_comparator_delivers_all() {
1932        // Without a position comparator set, all events should be delivered
1933        // even if there's a resume position entry.
1934        let params = SourceBaseParams::new("no-cmp").with_dispatch_mode(DispatchMode::Channel);
1935        let base = SourceBase::new(params).unwrap();
1936        // Deliberately NOT setting a position comparator
1937
1938        let mut rx = base.create_streaming_receiver().await.unwrap();
1939
1940        base.subscriber_resume_positions
1941            .write()
1942            .await
1943            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1944
1945        // Event at [0x00, 0x03] — normally suppressed, but no comparator
1946        base.dispatch_event(make_event("no-cmp", Some(&[0x00, 0x03])))
1947            .await
1948            .unwrap();
1949
1950        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1951            .await
1952            .unwrap()
1953            .unwrap();
1954        assert_eq!(
1955            received.source_position.as_ref().unwrap().as_ref(),
1956            &[0x00, 0x03]
1957        );
1958    }
1959
1960    #[tokio::test]
1961    async fn test_position_filter_batch_mode() {
1962        let params = SourceBaseParams::new("pos-batch").with_dispatch_mode(DispatchMode::Channel);
1963        let base = SourceBase::new(params).unwrap();
1964        base.set_position_comparator(ByteLexPositionComparator)
1965            .await;
1966
1967        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1968        let mut rx2 = base.create_streaming_receiver().await.unwrap();
1969
1970        // rx1 (idx 0): resume_from = [0x00, 0x05]
1971        // rx2 (idx 1): resume_from = [0x00, 0x02]
1972        {
1973            let mut positions = base.subscriber_resume_positions.write().await;
1974            positions.insert(0, Bytes::from_static(&[0x00, 0x05]));
1975            positions.insert(1, Bytes::from_static(&[0x00, 0x02]));
1976        }
1977
1978        let events = vec![
1979            make_event("pos-batch", Some(&[0x00, 0x01])), // before both
1980            make_event("pos-batch", Some(&[0x00, 0x03])), // past rx2, before rx1
1981            make_event("pos-batch", Some(&[0x00, 0x06])), // past both
1982        ];
1983        base.dispatch_events_batch(events).await.unwrap();
1984
1985        // rx2 should receive events at [0x03] and [0x06] (skipping [0x01])
1986        let r2_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1987            .await
1988            .unwrap()
1989            .unwrap();
1990        assert_eq!(
1991            r2_1.source_position.as_ref().unwrap().as_ref(),
1992            &[0x00, 0x03]
1993        );
1994
1995        let r2_2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1996            .await
1997            .unwrap()
1998            .unwrap();
1999        assert_eq!(
2000            r2_2.source_position.as_ref().unwrap().as_ref(),
2001            &[0x00, 0x06]
2002        );
2003
2004        // rx1 should only receive event at [0x06] (skipping [0x01] and [0x03])
2005        let r1_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
2006            .await
2007            .unwrap()
2008            .unwrap();
2009        assert_eq!(
2010            r1_1.source_position.as_ref().unwrap().as_ref(),
2011            &[0x00, 0x06]
2012        );
2013
2014        // No more events for rx1
2015        let r1_extra = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
2016        assert!(r1_extra.is_err(), "rx1 should have no more events");
2017    }
2018
2019    #[tokio::test]
2020    async fn test_position_filter_events_without_position_delivered() {
2021        // Events with no source_position cannot be filtered — they pass through.
2022        let params = SourceBaseParams::new("pos-none").with_dispatch_mode(DispatchMode::Channel);
2023        let base = SourceBase::new(params).unwrap();
2024        base.set_position_comparator(ByteLexPositionComparator)
2025            .await;
2026
2027        let mut rx = base.create_streaming_receiver().await.unwrap();
2028
2029        base.subscriber_resume_positions
2030            .write()
2031            .await
2032            .insert(0, Bytes::from_static(&[0x00, 0x05]));
2033
2034        // Event with no source_position
2035        base.dispatch_event(make_event("pos-none", None))
2036            .await
2037            .unwrap();
2038
2039        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
2040            .await
2041            .unwrap()
2042            .unwrap();
2043        assert!(received.source_position.is_none());
2044    }
2045
2046    // =========================================================================
2047    // Sequence → source position mapping tests
2048    // =========================================================================
2049
2050    #[tokio::test]
2051    async fn test_sequence_position_map_populated_on_dispatch() {
2052        let params = SourceBaseParams::new("spm-1").with_dispatch_mode(DispatchMode::Channel);
2053        let base = SourceBase::new(params).unwrap();
2054        let _rx = base.create_streaming_receiver().await.unwrap();
2055
2056        let lsn: u64 = 0x1234;
2057        base.dispatch_event(make_event("spm-1", Some(&lsn.to_be_bytes())))
2058            .await
2059            .unwrap();
2060
2061        let map = base.sequence_position_map.read().await;
2062        assert_eq!(map.len(), 1);
2063        let (seq, pos) = map.iter().next().unwrap();
2064        assert_eq!(*seq, 1); // first sequence
2065        assert_eq!(pos.as_ref(), &lsn.to_be_bytes());
2066    }
2067
2068    #[tokio::test]
2069    async fn test_sequence_position_map_not_populated_without_position() {
2070        let params = SourceBaseParams::new("spm-none").with_dispatch_mode(DispatchMode::Channel);
2071        let base = SourceBase::new(params).unwrap();
2072        let _rx = base.create_streaming_receiver().await.unwrap();
2073
2074        base.dispatch_event(make_event("spm-none", None))
2075            .await
2076            .unwrap();
2077
2078        let map = base.sequence_position_map.read().await;
2079        assert!(map.is_empty());
2080    }
2081
2082    #[tokio::test]
2083    async fn test_compute_confirmed_source_position_basic() {
2084        let params = SourceBaseParams::new("cssp-1").with_dispatch_mode(DispatchMode::Channel);
2085        let base = SourceBase::new(params).unwrap();
2086        let _rx = base.create_streaming_receiver().await.unwrap();
2087
2088        // Create position handle and set confirmed sequence
2089        let handle = base.create_position_handle("q1").await;
2090
2091        // Dispatch 3 events with known LSNs
2092        for lsn in [100u64, 200, 300] {
2093            base.dispatch_event(make_event("cssp-1", Some(&lsn.to_be_bytes())))
2094                .await
2095                .unwrap();
2096        }
2097
2098        // Confirm up to sequence 2 (second event, LSN=200)
2099        handle.store(2, Ordering::Relaxed);
2100
2101        let confirmed = base.compute_confirmed_source_position().await;
2102        assert!(confirmed.is_some());
2103        let lsn_bytes = confirmed.unwrap();
2104        assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 200);
2105    }
2106
2107    #[tokio::test]
2108    async fn test_compute_confirmed_source_position_returns_none_when_no_handles() {
2109        let base = SourceBase::new(SourceBaseParams::new("cssp-none")).unwrap();
2110        assert!(base.compute_confirmed_source_position().await.is_none());
2111    }
2112
2113    #[tokio::test]
2114    async fn test_compute_confirmed_source_position_returns_none_when_all_max() {
2115        let base = SourceBase::new(SourceBaseParams::new("cssp-max")).unwrap();
2116        let _h = base.create_position_handle("q1").await;
2117        // h stays at u64::MAX
2118        assert!(base.compute_confirmed_source_position().await.is_none());
2119    }
2120
2121    #[tokio::test]
2122    async fn test_compute_confirmed_source_position_min_of_two_queries() {
2123        let params = SourceBaseParams::new("cssp-2q").with_dispatch_mode(DispatchMode::Channel);
2124        let base = SourceBase::new(params).unwrap();
2125        let _rx = base.create_streaming_receiver().await.unwrap();
2126
2127        let h1 = base.create_position_handle("q1").await;
2128        let h2 = base.create_position_handle("q2").await;
2129
2130        // Dispatch 3 events
2131        for lsn in [100u64, 200, 300] {
2132            base.dispatch_event(make_event("cssp-2q", Some(&lsn.to_be_bytes())))
2133                .await
2134                .unwrap();
2135        }
2136
2137        // q1 confirmed seq 3 (LSN=300), q2 confirmed seq 1 (LSN=100)
2138        h1.store(3, Ordering::Relaxed);
2139        h2.store(1, Ordering::Relaxed);
2140
2141        let confirmed = base.compute_confirmed_source_position().await;
2142        assert!(confirmed.is_some());
2143        let lsn_bytes = confirmed.unwrap();
2144        // min is seq 1 → LSN 100
2145        assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 100);
2146    }
2147
2148    #[tokio::test]
2149    async fn test_prune_position_map() {
2150        let params = SourceBaseParams::new("prune-1").with_dispatch_mode(DispatchMode::Channel);
2151        let base = SourceBase::new(params).unwrap();
2152        let _rx = base.create_streaming_receiver().await.unwrap();
2153
2154        for lsn in [10u64, 20, 30, 40, 50] {
2155            base.dispatch_event(make_event("prune-1", Some(&lsn.to_be_bytes())))
2156                .await
2157                .unwrap();
2158        }
2159        // Sequences are 1..=5
2160        assert_eq!(base.sequence_position_map.read().await.len(), 5);
2161
2162        base.prune_position_map(3).await;
2163        let map = base.sequence_position_map.read().await;
2164        assert_eq!(map.len(), 2); // sequences 4 and 5 remain
2165        assert!(map.contains_key(&4));
2166        assert!(map.contains_key(&5));
2167    }
2168
2169    #[tokio::test]
2170    async fn test_prune_position_map_all() {
2171        let params = SourceBaseParams::new("prune-all").with_dispatch_mode(DispatchMode::Channel);
2172        let base = SourceBase::new(params).unwrap();
2173        let _rx = base.create_streaming_receiver().await.unwrap();
2174
2175        for lsn in [10u64, 20] {
2176            base.dispatch_event(make_event("prune-all", Some(&lsn.to_be_bytes())))
2177                .await
2178                .unwrap();
2179        }
2180        base.prune_position_map(100).await; // prune beyond last seq
2181        assert!(base.sequence_position_map.read().await.is_empty());
2182    }
2183
2184    #[tokio::test]
2185    async fn test_position_handle_initialized_to_last_sequence() {
2186        use crate::config::SourceSubscriptionSettings;
2187
2188        let params = SourceBaseParams::new("ph-init").with_dispatch_mode(DispatchMode::Channel);
2189        let base = SourceBase::new(params).unwrap();
2190
2191        let settings = SourceSubscriptionSettings {
2192            query_id: "q1".to_string(),
2193            source_id: "ph-init".to_string(),
2194            enable_bootstrap: false,
2195            resume_from: Some(Bytes::from_static(&[0x00, 0x01])),
2196            last_sequence: Some(42),
2197            request_position_handle: true,
2198            nodes: Default::default(),
2199            relations: Default::default(),
2200        };
2201
2202        let response = base
2203            .subscribe_with_bootstrap(&settings, "test")
2204            .await
2205            .unwrap();
2206        let handle = response.position_handle.expect("should have handle");
2207        // Handle should be initialized to last_sequence, not u64::MAX
2208        assert_eq!(handle.load(Ordering::Relaxed), 42);
2209    }
2210
2211    #[tokio::test]
2212    async fn test_position_handle_stays_max_without_last_sequence() {
2213        use crate::config::SourceSubscriptionSettings;
2214
2215        let params = SourceBaseParams::new("ph-no-ls").with_dispatch_mode(DispatchMode::Channel);
2216        let base = SourceBase::new(params).unwrap();
2217
2218        let settings = SourceSubscriptionSettings {
2219            query_id: "q1".to_string(),
2220            source_id: "ph-no-ls".to_string(),
2221            enable_bootstrap: true,
2222            resume_from: None,
2223            last_sequence: None,
2224            request_position_handle: true,
2225            nodes: Default::default(),
2226            relations: Default::default(),
2227        };
2228
2229        let response = base
2230            .subscribe_with_bootstrap(&settings, "test")
2231            .await
2232            .unwrap();
2233        let handle = response.position_handle.expect("should have handle");
2234        // No last_sequence → handle stays at u64::MAX
2235        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
2236    }
2237
2238    #[tokio::test]
2239    async fn test_batch_dispatch_populates_sequence_position_map() {
2240        let params = SourceBaseParams::new("spm-batch").with_dispatch_mode(DispatchMode::Channel);
2241        let base = SourceBase::new(params).unwrap();
2242        let _rx = base.create_streaming_receiver().await.unwrap();
2243
2244        let events = vec![
2245            make_event("spm-batch", Some(&100u64.to_be_bytes())),
2246            make_event("spm-batch", Some(&200u64.to_be_bytes())),
2247            make_event("spm-batch", None), // no position
2248        ];
2249
2250        base.dispatch_events_batch(events).await.unwrap();
2251
2252        let map = base.sequence_position_map.read().await;
2253        // Only 2 entries (the one without position is skipped)
2254        assert_eq!(map.len(), 2);
2255        assert!(map.contains_key(&1));
2256        assert!(map.contains_key(&2));
2257    }
2258
2259    #[tokio::test]
2260    async fn test_position_filter_rewind_protection_multi_subscriber() {
2261        // Simulates the Postgres scenario: subscriber A joins, processes events,
2262        // then subscriber B joins and the source rewinds the stream.
2263        // Subscriber A should NOT see replayed events thanks to the
2264        // persistent high-water mark.
2265        let params = SourceBaseParams::new("rewind").with_dispatch_mode(DispatchMode::Channel);
2266        let base = SourceBase::new(params).unwrap();
2267        base.set_position_comparator(ByteLexPositionComparator)
2268            .await;
2269
2270        // Subscriber A joins with resume_from at [0x10]
2271        let mut rx_a = base.create_streaming_receiver().await.unwrap();
2272        base.subscriber_resume_positions
2273            .write()
2274            .await
2275            .insert(0, Bytes::from_static(&[0x10]));
2276
2277        // Source dispatches events at [0x20] and [0x30] — both past A's resume
2278        base.dispatch_event(make_event("rewind", Some(&[0x20])))
2279            .await
2280            .unwrap();
2281        let ev = rx_a.recv().await.unwrap();
2282        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2283
2284        base.dispatch_event(make_event("rewind", Some(&[0x30])))
2285            .await
2286            .unwrap();
2287        let ev = rx_a.recv().await.unwrap();
2288        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2289
2290        // Subscriber B joins with resume_from at [0x10]
2291        let mut rx_b = base.create_streaming_receiver().await.unwrap();
2292        base.subscriber_resume_positions
2293            .write()
2294            .await
2295            .insert(1, Bytes::from_static(&[0x10]));
2296
2297        // Source REWINDS — replays from [0x20] again
2298        // A should NOT see these (high-water is at [0x30])
2299        // B should see [0x20] (past its resume_from [0x10])
2300        base.dispatch_event(make_event("rewind", Some(&[0x20])))
2301            .await
2302            .unwrap();
2303
2304        // A should NOT receive the replayed event
2305        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2306        assert!(
2307            r.is_err(),
2308            "subscriber A should not see replayed event at [0x20]"
2309        );
2310
2311        // B SHOULD receive it (it's past B's resume_from)
2312        let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2313            .await
2314            .unwrap()
2315            .unwrap();
2316        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2317
2318        // Replay [0x30] — A should NOT see it, B should
2319        base.dispatch_event(make_event("rewind", Some(&[0x30])))
2320            .await
2321            .unwrap();
2322
2323        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2324        assert!(
2325            r.is_err(),
2326            "subscriber A should not see replayed event at [0x30]"
2327        );
2328
2329        let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2330            .await
2331            .unwrap()
2332            .unwrap();
2333        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2334
2335        // New event [0x40] — BOTH should see it
2336        base.dispatch_event(make_event("rewind", Some(&[0x40])))
2337            .await
2338            .unwrap();
2339
2340        let ev_a = tokio::time::timeout(std::time::Duration::from_millis(100), rx_a.recv())
2341            .await
2342            .unwrap()
2343            .unwrap();
2344        assert_eq!(ev_a.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2345
2346        let ev_b = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2347            .await
2348            .unwrap()
2349            .unwrap();
2350        assert_eq!(ev_b.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2351    }
2352
2353    #[tokio::test]
2354    async fn test_high_water_mark_set_for_new_subscriber_without_resume() {
2355        // A subscriber without initial resume_from should get a high-water
2356        // mark after its first event, protecting it from future rewinds.
2357        let params = SourceBaseParams::new("hwm-new").with_dispatch_mode(DispatchMode::Channel);
2358        let base = SourceBase::new(params).unwrap();
2359        base.set_position_comparator(ByteLexPositionComparator)
2360            .await;
2361
2362        let mut rx = base.create_streaming_receiver().await.unwrap();
2363        // No resume_from set
2364
2365        // Dispatch event — should be delivered (no filter)
2366        base.dispatch_event(make_event("hwm-new", Some(&[0x10])))
2367            .await
2368            .unwrap();
2369        let _ = rx.recv().await.unwrap();
2370
2371        // Now the high-water mark should be set at [0x10]
2372        {
2373            let positions = base.subscriber_resume_positions.read().await;
2374            assert_eq!(
2375                positions.get(&0).map(|b| b.as_ref()),
2376                Some([0x10].as_slice()),
2377                "high-water mark should be set after first dispatch"
2378            );
2379        }
2380
2381        // Rewind: event at [0x05] should be suppressed
2382        base.dispatch_event(make_event("hwm-new", Some(&[0x05])))
2383            .await
2384            .unwrap();
2385        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
2386        assert!(
2387            r.is_err(),
2388            "event below high-water mark should be suppressed"
2389        );
2390    }
2391}