Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::{BTreeMap, HashMap};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::{Notify, RwLock};
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::sources::PositionComparator;
47use crate::state_store::StateStoreProvider;
48use bytes::Bytes;
49use drasi_core::models::SourceChange;
50
51/// Parameters for creating a SourceBase instance.
52///
53/// This struct contains only the information that SourceBase needs to function.
54/// Plugin-specific configuration should remain in the plugin crate.
55///
56/// # Example
57///
58/// ```ignore
59/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
60///
61/// let params = SourceBaseParams::new("my-source")
62///     .with_dispatch_mode(DispatchMode::Channel)
63///     .with_dispatch_buffer_capacity(2000)
64///     .with_bootstrap_provider(my_provider);
65///
66/// let base = SourceBase::new(params)?;
67/// ```
68pub struct SourceBaseParams {
69    /// Unique identifier for the source
70    pub id: String,
71    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
72    pub dispatch_mode: Option<DispatchMode>,
73    /// Dispatch buffer capacity - defaults to 1000
74    pub dispatch_buffer_capacity: Option<usize>,
75    /// Optional bootstrap provider to set during construction
76    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
77    /// Optional state store provider to use when running standalone
78    pub state_store: Option<Arc<dyn StateStoreProvider>>,
79    /// Whether this source should auto-start - defaults to true
80    pub auto_start: bool,
81}
82
83impl std::fmt::Debug for SourceBaseParams {
84    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85        f.debug_struct("SourceBaseParams")
86            .field("id", &self.id)
87            .field("dispatch_mode", &self.dispatch_mode)
88            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
89            .field(
90                "bootstrap_provider",
91                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
92            )
93            .field(
94                "state_store",
95                &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
96            )
97            .field("auto_start", &self.auto_start)
98            .finish()
99    }
100}
101
102impl SourceBaseParams {
103    /// Create new params with just an ID, using defaults for everything else
104    pub fn new(id: impl Into<String>) -> Self {
105        Self {
106            id: id.into(),
107            dispatch_mode: None,
108            dispatch_buffer_capacity: None,
109            bootstrap_provider: None,
110            state_store: None,
111            auto_start: true,
112        }
113    }
114
115    /// Set the dispatch mode
116    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
117        self.dispatch_mode = Some(mode);
118        self
119    }
120
121    /// Set the dispatch buffer capacity
122    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
123        self.dispatch_buffer_capacity = Some(capacity);
124        self
125    }
126
127    /// Set the bootstrap provider
128    ///
129    /// This provider will be used during source subscription to deliver
130    /// initial data to queries that request bootstrap.
131    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
132        self.bootstrap_provider = Some(Box::new(provider));
133        self
134    }
135
136    /// Set the state store provider for standalone usage.
137    pub fn with_state_store(mut self, store: Arc<dyn StateStoreProvider>) -> Self {
138        self.state_store = Some(store);
139        self
140    }
141
142    /// Set whether this source should auto-start
143    ///
144    /// Default is `true`. Set to `false` if this source should only be
145    /// started manually via `start_source()`.
146    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
147        self.auto_start = auto_start;
148        self
149    }
150}
151
152/// Base implementation for common source functionality
153pub struct SourceBase {
154    /// Source identifier
155    pub id: String,
156    /// Dispatch mode setting
157    dispatch_mode: DispatchMode,
158    /// Dispatch buffer capacity
159    dispatch_buffer_capacity: usize,
160    /// Whether this source should auto-start
161    pub auto_start: bool,
162    /// Component status handle — always available, wired to graph during initialize().
163    status_handle: ComponentStatusHandle,
164    /// Dispatchers for sending source events to subscribers
165    ///
166    /// This is a vector of dispatchers that send source events to all registered
167    /// subscribers (queries). When a source produces a change event, it broadcasts
168    /// it to all dispatchers in this vector.
169    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
170    /// Runtime context (set by initialize())
171    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
172    /// State store provider (extracted from context for convenience)
173    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
174    /// Handle to the source's main task
175    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
176    /// Sender for shutdown signal
177    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
178    /// Optional bootstrap provider - plugins set this if they support bootstrap
179    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
180    /// Optional identity provider for credential management.
181    /// Set either programmatically (via `set_identity_provider`) or automatically
182    /// from the runtime context during `initialize()`.
183    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
184    /// Per-query position handles for replay-capable subscribers.
185    ///
186    /// Keyed by `query_id`. Values are the same `Arc<AtomicU64>` returned in
187    /// `SubscriptionResponse::position_handle`. The query writes its last
188    /// durably-processed sequence; the source reads `compute_confirmed_position()`
189    /// to advance its upstream cursor. Initial value is `u64::MAX`
190    /// ("no position confirmed yet"). Only populated when
191    /// `request_position_handle == true`.
192    position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
193    /// Monotonically increasing counter for assigning event sequences.
194    /// The framework stamps every dispatched event with this sequence.
195    next_sequence: Arc<AtomicU64>,
196    /// Original raw config JSON from the descriptor, preserving ConfigValue
197    /// envelopes (secrets, env vars) for lossless persistence roundtrips.
198    raw_config: Option<serde_json::Value>,
199    /// Notified whenever a new subscriber registers via `create_streaming_receiver`.
200    /// Sources can await `wait_for_subscribers()` before starting their polling
201    /// loop to avoid dispatching events before any subscriber exists.
202    subscriber_notify: Arc<Notify>,
203    /// Per-subscriber resume positions for replay filtering.
204    ///
205    /// Keyed by dispatcher index in the `dispatchers` Vec. When an event's
206    /// `source_position` has not yet passed the resume position, the event is
207    /// not delivered to that subscriber's dispatcher. Once `position_reached()`
208    /// returns true, the entry is removed and all subsequent events flow through.
209    ///
210    /// Only populated in Channel dispatch mode (Broadcast cannot filter per-subscriber).
211    subscriber_resume_positions: Arc<RwLock<HashMap<usize, Bytes>>>,
212    /// Optional position comparator for per-subscriber replay filtering.
213    ///
214    /// Set by sources that support replay. Without a comparator, position
215    /// filtering is disabled (all events are delivered to all subscribers).
216    position_comparator: Arc<RwLock<Option<Arc<dyn PositionComparator>>>>,
217    /// Maps framework sequence numbers to source positions (`source_position`
218    /// bytes from the dispatched event).
219    ///
220    /// Populated during `dispatch_event()` / `dispatch_events_batch()` for
221    /// events that carry a `source_position`.  Used by
222    /// `compute_confirmed_source_position()` to convert the confirmed
223    /// sequence (from position handles) back to a source-native position
224    /// (e.g. Postgres WAL LSN) for upstream cursor advancement.
225    ///
226    /// Pruned explicitly via `prune_position_map()` after the source has
227    /// successfully acknowledged the confirmed position to its upstream.
228    sequence_position_map: Arc<RwLock<BTreeMap<u64, Bytes>>>,
229}
230
231impl SourceBase {
232    /// Create a new SourceBase with the given parameters
233    ///
234    /// The status channel is not required during construction - it will be
235    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
236    ///
237    /// If a bootstrap provider is specified in params, it will be set during
238    /// construction (no async needed since nothing is shared yet).
239    pub fn new(params: SourceBaseParams) -> Result<Self> {
240        // Determine dispatch mode (default to Channel if not specified)
241        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
242        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
243
244        // Set up initial dispatchers based on dispatch mode
245        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
246            Vec::new();
247
248        if dispatch_mode == DispatchMode::Broadcast {
249            // For broadcast mode, create a single broadcast dispatcher
250            let dispatcher =
251                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
252            dispatchers.push(Box::new(dispatcher));
253        }
254        // For channel mode, dispatchers will be created on-demand when subscribing
255
256        // Initialize bootstrap provider if provided (no async needed at construction time)
257        let bootstrap_provider = params
258            .bootstrap_provider
259            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
260
261        Ok(Self {
262            id: params.id.clone(),
263            dispatch_mode,
264            dispatch_buffer_capacity,
265            auto_start: params.auto_start,
266            status_handle: ComponentStatusHandle::new(&params.id),
267            dispatchers: Arc::new(RwLock::new(dispatchers)),
268            context: Arc::new(RwLock::new(None)), // Set by initialize()
269            state_store: Arc::new(RwLock::new(params.state_store)), // Extracted from context
270            task_handle: Arc::new(RwLock::new(None)),
271            shutdown_tx: Arc::new(RwLock::new(None)),
272            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
273            identity_provider: Arc::new(RwLock::new(None)),
274            position_handles: Arc::new(RwLock::new(HashMap::new())),
275            next_sequence: Arc::new(AtomicU64::new(1)),
276            raw_config: None,
277            subscriber_notify: Arc::new(Notify::new()),
278            subscriber_resume_positions: Arc::new(RwLock::new(HashMap::new())),
279            position_comparator: Arc::new(RwLock::new(None)),
280            sequence_position_map: Arc::new(RwLock::new(BTreeMap::new())),
281        })
282    }
283
284    /// Get whether this source should auto-start
285    pub fn get_auto_start(&self) -> bool {
286        self.auto_start
287    }
288
289    /// Set the original raw config JSON for lossless persistence roundtrips.
290    ///
291    /// Called by plugin descriptors to preserve the original config JSON
292    /// (including `ConfigValue` envelopes for secrets and env vars) before
293    /// resolution to plain values.
294    pub fn set_raw_config(&mut self, config: serde_json::Value) {
295        self.raw_config = Some(config);
296    }
297
298    /// Get the original raw config JSON, if set by a descriptor.
299    ///
300    /// Returns `None` for builder-created components that don't have
301    /// a raw config JSON (they use DTO reconstruction in `properties()`).
302    pub fn raw_config(&self) -> Option<&serde_json::Value> {
303        self.raw_config.as_ref()
304    }
305
306    /// Build the properties map for this source.
307    ///
308    /// If `raw_config` was set (descriptor path), returns its top-level keys.
309    /// Otherwise, serializes `fallback_dto` (the DTO reconstructed from typed
310    /// config) to produce camelCase output.
311    ///
312    /// This eliminates the duplicated if-let + serialize pattern from plugins.
313    pub fn properties_or_serialize<D: serde::Serialize>(
314        &self,
315        fallback_dto: &D,
316    ) -> HashMap<String, serde_json::Value> {
317        if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
318            return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
319        }
320
321        match serde_json::to_value(fallback_dto) {
322            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
323            _ => HashMap::new(),
324        }
325    }
326
327    /// Initialize the source with runtime context.
328    ///
329    /// This method is called automatically by DrasiLib's `add_source()` method.
330    /// Plugin developers do not need to call this directly.
331    ///
332    /// The context provides access to:
333    /// - `source_id`: The source's unique identifier
334    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
335    /// - `state_store`: Optional persistent state storage
336    pub async fn initialize(&self, context: SourceRuntimeContext) {
337        // Store context for later use
338        *self.context.write().await = Some(context.clone());
339
340        // Wire the status handle to the graph update channel
341        self.status_handle.wire(context.update_tx.clone()).await;
342
343        if let Some(state_store) = context.state_store.as_ref() {
344            *self.state_store.write().await = Some(state_store.clone());
345        }
346
347        // Store identity provider from context if not already set programmatically
348        if let Some(ip) = context.identity_provider.as_ref() {
349            let mut guard = self.identity_provider.write().await;
350            if guard.is_none() {
351                *guard = Some(ip.clone());
352            }
353        }
354    }
355
356    /// Get the runtime context if initialized.
357    ///
358    /// Returns `None` if `initialize()` has not been called yet.
359    pub async fn context(&self) -> Option<SourceRuntimeContext> {
360        self.context.read().await.clone()
361    }
362
363    /// Get the state store if configured.
364    ///
365    /// Returns `None` if no state store was provided in the context.
366    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
367        self.state_store.read().await.clone()
368    }
369
370    /// Get the identity provider if set.
371    ///
372    /// Returns the identity provider set either programmatically via
373    /// `set_identity_provider()` or from the runtime context during `initialize()`.
374    /// Programmatically-set providers take precedence over context providers.
375    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
376        self.identity_provider.read().await.clone()
377    }
378
379    /// Set the identity provider programmatically.
380    ///
381    /// This is typically called during source construction when the provider
382    /// is available from configuration (e.g., `with_identity_provider()` builder).
383    /// Providers set this way take precedence over context-injected providers.
384    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
385        *self.identity_provider.write().await = Some(provider);
386    }
387
388    /// Create and register a position handle for `query_id`, initialized to `u64::MAX`.
389    ///
390    /// Returns the shared handle; the same `Arc` is placed in
391    /// `SubscriptionResponse::position_handle` so the query and the source share
392    /// one atomic. If a handle already exists for `query_id` (re-subscribe after
393    /// transient disconnect), the existing handle is returned to preserve any
394    /// position the query had previously reported.
395    pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
396        let mut handles = self.position_handles.write().await;
397        if let Some(existing) = handles.get(query_id) {
398            return existing.clone();
399        }
400        let handle = Arc::new(AtomicU64::new(u64::MAX));
401        handles.insert(query_id.to_string(), handle.clone());
402        handle
403    }
404
405    /// Remove the position handle for `query_id`. No-op if absent.
406    ///
407    /// Called from explicit cleanup paths (`stop_query`/`delete_query` will be
408    /// wired in a follow-up issue). Until then, `cleanup_stale_handles()`
409    /// (invoked inside `compute_confirmed_position`) catches dropped subscribers.
410    pub async fn remove_position_handle(&self, query_id: &str) {
411        let mut handles = self.position_handles.write().await;
412        handles.remove(query_id);
413    }
414
415    /// Compute the minimum confirmed position across all live subscribers.
416    ///
417    /// Returns `None` if no handles are registered, or if every registered
418    /// handle is still `u64::MAX` (no subscriber has confirmed a position yet —
419    /// typically because they are still bootstrapping). Otherwise returns the
420    /// minimum non-`u64::MAX` value, suitable for advancing the source's
421    /// upstream cursor (Postgres `flush_lsn`, Kafka commit, transient WAL prune
422    /// threshold).
423    ///
424    /// Piggy-backs `cleanup_stale_handles()` so dropped subscribers do not pin
425    /// the watermark indefinitely.
426    pub async fn compute_confirmed_position(&self) -> Option<u64> {
427        self.cleanup_stale_handles().await;
428        let handles = self.position_handles.read().await;
429        let mut min: Option<u64> = None;
430        for handle in handles.values() {
431            let v = handle.load(Ordering::Relaxed);
432            if v == u64::MAX {
433                continue;
434            }
435            min = Some(min.map_or(v, |m| m.min(v)));
436        }
437        min
438    }
439
440    /// Drop entries whose `Arc::strong_count == 1` (only `SourceBase` holds a
441    /// reference).
442    ///
443    /// This indicates the subscriber dropped its `SubscriptionResponse` without
444    /// calling `remove_position_handle` — common during query teardown until
445    /// explicit cleanup is wired by the query manager.
446    ///
447    /// Safety constraint: this relies on `SourceBase` being the only long-lived
448    /// holder of the `Arc` besides the subscribing query. If a future periodic
449    /// scan task (or any other component) clones the handle, this method must
450    /// be revisited or replaced with explicit liveness tracking.
451    pub async fn cleanup_stale_handles(&self) {
452        let mut handles = self.position_handles.write().await;
453        handles.retain(|_, handle| Arc::strong_count(handle) > 1);
454    }
455
456    /// Translate the confirmed framework sequence into the corresponding
457    /// source-native position (e.g. Postgres WAL LSN, Kafka offset).
458    ///
459    /// Returns `None` when no confirmed position exists (no subscribers, all
460    /// at `u64::MAX`, or the sequence map has been pruned past the confirmed
461    /// point).
462    ///
463    /// This does **not** prune the internal map — call
464    /// [`prune_position_map()`](Self::prune_position_map) after the source
465    /// has successfully acknowledged the position to its upstream.
466    pub async fn compute_confirmed_source_position(&self) -> Option<Bytes> {
467        let confirmed_seq = self.compute_confirmed_position().await?;
468        let map = self.sequence_position_map.read().await;
469        // Find the entry with the largest sequence ≤ confirmed_seq.
470        map.range(..=confirmed_seq)
471            .next_back()
472            .map(|(_, pos)| pos.clone())
473    }
474
475    /// Prune sequence→position entries that are no longer needed.
476    ///
477    /// Removes all entries with sequence ≤ `up_to_seq`. Call this after the
478    /// source has successfully sent feedback/committed the confirmed position
479    /// to its upstream, so re-send on failure is still possible.
480    pub async fn prune_position_map(&self, up_to_seq: u64) {
481        let mut map = self.sequence_position_map.write().await;
482        // BTreeMap::split_off returns entries >= key; we keep those.
483        let keep = map.split_off(&(up_to_seq.saturating_add(1)));
484        *map = keep;
485    }
486
487    /// Reset the sequence counter, typically after recovering from a checkpoint.
488    /// The next dispatched event will receive `sequence + 1`.
489    pub fn set_next_sequence(&self, sequence: u64) {
490        self.next_sequence
491            .store(sequence.saturating_add(1), Ordering::Relaxed);
492    }
493
494    /// Apply subscription settings that affect the source base.
495    ///
496    /// Should be called at the start of `Source::subscribe()` implementations.
497    /// Handles:
498    /// - Recovering the sequence counter from `last_sequence` to maintain monotonicity
499    ///   across restarts.
500    pub fn apply_subscription_settings(
501        &self,
502        settings: &crate::config::SourceSubscriptionSettings,
503    ) {
504        if let Some(last_seq) = settings.last_sequence {
505            // Atomically advance to last_seq+1, never go backwards.
506            // fetch_max is safe under concurrent subscriptions.
507            let next = last_seq.saturating_add(1);
508            let prev = self.next_sequence.fetch_max(next, Ordering::Relaxed);
509            if next > prev {
510                info!(
511                    "[{}] Sequence counter recovered to {} (from checkpoint last_sequence={})",
512                    self.id, next, last_seq
513                );
514            }
515        }
516    }
517
518    /// Returns a clonable [`ComponentStatusHandle`] for use in spawned tasks.
519    ///
520    /// The handle can both read and write the component's status and automatically
521    /// notifies the graph on every status change (after `initialize()`).
522    pub fn status_handle(&self) -> ComponentStatusHandle {
523        self.status_handle.clone()
524    }
525
526    /// Clone the SourceBase with shared Arc references
527    ///
528    /// This creates a new SourceBase that shares the same underlying
529    /// data through Arc references. Useful for passing to spawned tasks.
530    pub fn clone_shared(&self) -> Self {
531        Self {
532            id: self.id.clone(),
533            dispatch_mode: self.dispatch_mode,
534            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
535            auto_start: self.auto_start,
536            status_handle: self.status_handle.clone(),
537            dispatchers: self.dispatchers.clone(),
538            context: self.context.clone(),
539            state_store: self.state_store.clone(),
540            task_handle: self.task_handle.clone(),
541            shutdown_tx: self.shutdown_tx.clone(),
542            bootstrap_provider: self.bootstrap_provider.clone(),
543            identity_provider: self.identity_provider.clone(),
544            position_handles: self.position_handles.clone(),
545            next_sequence: self.next_sequence.clone(),
546            raw_config: self.raw_config.clone(),
547            subscriber_notify: self.subscriber_notify.clone(),
548            subscriber_resume_positions: self.subscriber_resume_positions.clone(),
549            position_comparator: self.position_comparator.clone(),
550            sequence_position_map: self.sequence_position_map.clone(),
551        }
552    }
553
554    /// Set the bootstrap provider for this source, taking ownership.
555    ///
556    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
557    /// The bootstrap provider is created by the plugin using its own configuration.
558    ///
559    /// # Example
560    /// ```ignore
561    /// let provider = MyBootstrapProvider::new(config);
562    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
563    /// ```
564    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
565        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
566    }
567
568    /// Set the position comparator for per-subscriber replay filtering.
569    ///
570    /// Sources that support replay should call this during construction to
571    /// enable per-subscriber position gating. Without a comparator, all events
572    /// are delivered to all subscribers regardless of their `resume_from` position.
573    pub async fn set_position_comparator(&self, comparator: impl PositionComparator + 'static) {
574        *self.position_comparator.write().await = Some(Arc::new(comparator));
575    }
576
577    /// Get the source ID
578    pub fn get_id(&self) -> &str {
579        &self.id
580    }
581
582    /// Create a streaming receiver for a query subscription
583    ///
584    /// This creates the appropriate receiver based on the configured dispatch mode:
585    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
586    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
587    ///
588    /// This is a helper method that can be used by sources with custom subscribe logic.
589    pub async fn create_streaming_receiver(
590        &self,
591    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
592        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
593            DispatchMode::Broadcast => {
594                // For broadcast mode, use the single dispatcher
595                let dispatchers = self.dispatchers.read().await;
596                if let Some(dispatcher) = dispatchers.first() {
597                    dispatcher.create_receiver().await?
598                } else {
599                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
600                }
601            }
602            DispatchMode::Channel => {
603                // For channel mode, create a new dispatcher for this subscription
604                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
605                    self.dispatch_buffer_capacity,
606                );
607                let receiver = dispatcher.create_receiver().await?;
608
609                // Add the new dispatcher to our list
610                let mut dispatchers = self.dispatchers.write().await;
611                dispatchers.push(Box::new(dispatcher));
612
613                receiver
614            }
615        };
616
617        // Wake any task blocked in wait_for_subscribers().
618        // Use notify_one() which stores a permit even if no one is waiting yet,
619        // avoiding a race between the dispatchers check and the await.
620        self.subscriber_notify.notify_one();
621
622        Ok(receiver)
623    }
624
625    /// Wait until at least one subscriber has registered.
626    ///
627    /// Sources that start a background polling loop (e.g. CDC) should call
628    /// this before entering their poll loop on a restart cycle. Without this,
629    /// events dispatched before `subscribe()` creates a new dispatcher would
630    /// be silently dropped, advancing the checkpoint past changes that no
631    /// subscriber ever received.
632    ///
633    /// Returns immediately if at least one dispatcher already exists (fresh
634    /// start with bootstrap, or broadcast mode which always has one).
635    pub async fn wait_for_subscribers(&self) {
636        loop {
637            let dispatchers = self.dispatchers.read().await;
638            if !dispatchers.is_empty() {
639                return;
640            }
641            drop(dispatchers);
642            self.subscriber_notify.notified().await;
643        }
644    }
645
646    /// Subscribe to this source with optional bootstrap
647    ///
648    /// This is the standard subscribe implementation that all sources can use.
649    /// It handles:
650    /// - Creating a receiver for streaming events (based on dispatch mode)
651    /// - Setting up bootstrap if requested and a provider has been set
652    /// - Returning the appropriate SubscriptionResponse
653    pub async fn subscribe_with_bootstrap(
654        &self,
655        settings: &crate::config::SourceSubscriptionSettings,
656        source_type: &str,
657    ) -> Result<SubscriptionResponse> {
658        // Recover sequence counter from checkpoint before anything else
659        self.apply_subscription_settings(settings);
660
661        info!(
662            "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
663            settings.query_id,
664            source_type,
665            self.id,
666            settings.enable_bootstrap,
667            settings.resume_from,
668            settings.request_position_handle
669        );
670
671        // Create streaming receiver using helper method
672        let receiver = self.create_streaming_receiver().await?;
673
674        // Register per-subscriber position filter for replay dedup.
675        // In Channel mode, the new dispatcher is the last entry in the vec.
676        // In Broadcast mode, per-subscriber filtering is not supported.
677        if self.dispatch_mode == DispatchMode::Channel {
678            if let Some(ref resume_pos) = settings.resume_from {
679                let dispatchers = self.dispatchers.read().await;
680                let dispatcher_idx = dispatchers.len().saturating_sub(1);
681                drop(dispatchers);
682                self.subscriber_resume_positions
683                    .write()
684                    .await
685                    .insert(dispatcher_idx, resume_pos.clone());
686                debug!(
687                    "[{}] Registered resume position filter for subscriber '{}' at dispatcher index {}",
688                    self.id, settings.query_id, dispatcher_idx
689                );
690            }
691        }
692
693        let query_id_for_response = settings.query_id.clone();
694
695        // resume_from overrides bootstrap: a resuming query already has base
696        // state in its persistent index and just needs replay from the
697        // requested sequence. Re-bootstrapping would corrupt that state.
698        let (bootstrap_receiver, bootstrap_result_receiver) = if settings.resume_from.is_some() {
699            info!(
700                "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
701                settings.query_id, settings.resume_from, source_type, self.id
702            );
703            (None, None)
704        } else if settings.enable_bootstrap {
705            match self
706                .handle_bootstrap_subscription(settings, source_type)
707                .await?
708            {
709                Some((event_rx, result_rx)) => (Some(event_rx), Some(result_rx)),
710                None => (None, None),
711            }
712        } else {
713            (None, None)
714        };
715
716        // Only persistent (replay-capable) queries request a handle. Volatile
717        // queries are deliberately excluded from the min-watermark so they
718        // cannot pin upstream advancement.
719        let position_handle = if settings.request_position_handle {
720            let handle = self.create_position_handle(&settings.query_id).await;
721            // Initialize the handle to the query's checkpoint sequence so that
722            // compute_confirmed_position() includes this subscriber from the
723            // start. Without this, a resuming query whose handle stays at
724            // u64::MAX would be invisible to the min-watermark, letting
725            // flush_lsn advance past its checkpoint.
726            if let Some(last_seq) = settings.last_sequence {
727                handle.store(last_seq, Ordering::Release);
728            }
729            Some(handle)
730        } else {
731            None
732        };
733
734        Ok(SubscriptionResponse {
735            query_id: query_id_for_response,
736            source_id: self.id.clone(),
737            receiver,
738            bootstrap_receiver,
739            position_handle,
740            bootstrap_result_receiver,
741        })
742    }
743
744    /// Handle bootstrap subscription logic.
745    ///
746    /// Returns the bootstrap event receiver and a oneshot receiver for the
747    /// `BootstrapResult` handover metadata.
748    async fn handle_bootstrap_subscription(
749        &self,
750        settings: &crate::config::SourceSubscriptionSettings,
751        source_type: &str,
752    ) -> Result<
753        Option<(
754            BootstrapEventReceiver,
755            tokio::sync::oneshot::Receiver<anyhow::Result<BootstrapResult>>,
756        )>,
757    > {
758        let provider_guard = self.bootstrap_provider.read().await;
759        if let Some(provider) = provider_guard.clone() {
760            drop(provider_guard); // Release lock before spawning task
761
762            info!(
763                "Creating bootstrap for query '{}' on {} source '{}'",
764                settings.query_id, source_type, self.id
765            );
766
767            // Create bootstrap context
768            let context = BootstrapContext::new_minimal(
769                self.id.clone(), // server_id
770                self.id.clone(), // source_id
771            );
772
773            // Create bootstrap channel
774            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
775
776            // Create oneshot for BootstrapResult handover metadata
777            let (result_tx, result_rx) = tokio::sync::oneshot::channel();
778
779            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
780            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
781            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
782
783            // Create bootstrap request with request_id
784            let request = BootstrapRequest {
785                query_id: settings.query_id.clone(),
786                node_labels,
787                relation_labels,
788                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
789            };
790
791            // Clone settings for the async task
792            let settings_clone = settings.clone();
793            let source_id = self.id.clone();
794
795            // Get instance_id from context for log routing isolation
796            let instance_id = self
797                .context()
798                .await
799                .map(|c| c.instance_id.clone())
800                .unwrap_or_default();
801
802            // Spawn bootstrap task with tracing span for proper log routing
803            let span = tracing::info_span!(
804                "source_bootstrap",
805                instance_id = %instance_id,
806                component_id = %source_id,
807                component_type = "source"
808            );
809            tokio::spawn(
810                async move {
811                    let outcome = provider
812                        .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
813                        .await;
814
815                    match &outcome {
816                        Ok(result) => {
817                            info!(
818                                "Bootstrap completed successfully for query '{}', sent {} events \
819                                 (last_sequence={:?}, sequences_aligned={})",
820                                settings_clone.query_id,
821                                result.event_count,
822                                result.last_sequence,
823                                result.sequences_aligned
824                            );
825                        }
826                        Err(e) => {
827                            error!(
828                                "Bootstrap failed for query '{}': {e}",
829                                settings_clone.query_id
830                            );
831                        }
832                    }
833
834                    // Send the result (or error) to the query manager for handover.
835                    // If the receiver was dropped (query stopped), this is a no-op.
836                    let _ = result_tx.send(outcome);
837                }
838                .instrument(span),
839            );
840
841            Ok(Some((bootstrap_rx, result_rx)))
842        } else {
843            info!(
844                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
845                settings.query_id, source_type, self.id
846            );
847            Ok(None)
848        }
849    }
850
851    /// Dispatch a SourceChange event with profiling metadata
852    ///
853    /// This method handles the common pattern of:
854    /// - Creating profiling metadata with timestamp
855    /// - Wrapping the change in a SourceEventWrapper
856    /// - Dispatching to all subscribers
857    /// - Handling the no-subscriber case gracefully
858    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
859        // Create profiling metadata
860        let mut profiling = profiling::ProfilingMetadata::new();
861        profiling.source_send_ns = Some(profiling::timestamp_ns());
862
863        // Create event wrapper
864        let wrapper = SourceEventWrapper::with_profiling(
865            self.id.clone(),
866            SourceEvent::Change(change),
867            chrono::Utc::now(),
868            profiling,
869        );
870
871        // Dispatch event
872        self.dispatch_event(wrapper).await
873    }
874
875    /// Maximum allowed size for source position bytes (64KB).
876    /// Positions exceeding this limit are skipped at checkpoint time
877    /// to prevent memory issues, preserving the last good position.
878    pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536;
879
880    /// Dispatch a SourceEventWrapper to all subscribers.
881    ///
882    /// This is a generic method for dispatching any SourceEvent.
883    /// It handles Arc-wrapping for zero-copy sharing and logs
884    /// when there are no subscribers.
885    /// The framework stamps every event with a monotonic sequence number.
886    pub async fn dispatch_event(&self, mut wrapper: SourceEventWrapper) -> Result<()> {
887        // Warn about oversized source positions; the checkpoint layer will
888        // enforce the limit and preserve the last good position.
889        if let Some(ref pos) = wrapper.source_position {
890            if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
891                warn!(
892                    "[{}] Source position is large ({} bytes > {} limit); \
893                     checkpoint staging will preserve the previous good position",
894                    self.id,
895                    pos.len(),
896                    Self::MAX_SOURCE_POSITION_BYTES
897                );
898            }
899        }
900
901        // Framework assigns the monotonic sequence
902        wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
903
904        // Record sequence→source_position mapping for confirmed-position lookups.
905        if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
906            self.sequence_position_map
907                .write()
908                .await
909                .insert(seq, pos.clone());
910        }
911
912        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
913
914        // Arc-wrap for zero-copy sharing across dispatchers
915        let arc_wrapper = Arc::new(wrapper);
916
917        // Send to all dispatchers, filtering by per-subscriber resume position
918        let dispatchers = self.dispatchers.read().await;
919        let comparator = self.position_comparator.read().await;
920        let mut cleared_indices: Vec<usize> = Vec::new();
921        // Collect (dispatcher_index, new_high_water) updates for after dispatch.
922        let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
923
924        for (idx, dispatcher) in dispatchers.iter().enumerate() {
925            // Check per-subscriber position high-water mark.
926            // Events at or before the subscriber's high-water mark are
927            // suppressed.  When the event passes the mark, we deliver it
928            // and advance the mark (instead of removing it) so that future
929            // rewinds (caused by later subscribers triggering a stream
930            // restart) are still caught.
931            if let Some(ref cmp) = *comparator {
932                let resume_positions = self.subscriber_resume_positions.read().await;
933                if let Some(resume_pos) = resume_positions.get(&idx) {
934                    if let Some(ref event_pos) = arc_wrapper.source_position {
935                        if !cmp.position_reached(event_pos, resume_pos) {
936                            // Event hasn't passed high-water mark — suppress
937                            continue;
938                        }
939                        // Position reached — will update high-water after dispatch
940                        cleared_indices.push(idx);
941                    }
942                    // No source_position on event — cannot filter, deliver it
943                }
944            }
945
946            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
947                debug!("[{}] Failed to dispatch event: {}", self.id, e);
948            } else if let Some(ref event_pos) = arc_wrapper.source_position {
949                // Track high-water mark so future rewinds are caught
950                hwm_updates.push((idx, event_pos.clone()));
951            }
952        }
953        drop(comparator);
954        drop(dispatchers);
955
956        // Advance high-water marks for dispatchers that received this event.
957        // For dispatchers that already had an entry (cleared_indices), this
958        // updates their mark.  For dispatchers that had no entry yet, this
959        // establishes one — protecting them from future rewinds.
960        if !hwm_updates.is_empty() {
961            let mut resume_positions = self.subscriber_resume_positions.write().await;
962            for (idx, pos) in hwm_updates {
963                resume_positions.insert(idx, pos);
964            }
965        }
966
967        Ok(())
968    }
969
970    /// Dispatch a batch of events, acquiring the dispatchers lock once for
971    /// the entire batch. This is more efficient than calling
972    /// [`dispatch_event()`](Self::dispatch_event) per-event when the source
973    /// processes multiple rows per poll cycle.
974    pub async fn dispatch_events_batch(&self, events: Vec<SourceEventWrapper>) -> Result<()> {
975        if events.is_empty() {
976            return Ok(());
977        }
978
979        let dispatchers = self.dispatchers.read().await;
980        let comparator = self.position_comparator.read().await;
981
982        for mut wrapper in events {
983            if let Some(ref pos) = wrapper.source_position {
984                if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
985                    warn!(
986                        "[{}] Source position is large ({} bytes > {} limit); \
987                         checkpoint staging will preserve the previous good position",
988                        self.id,
989                        pos.len(),
990                        Self::MAX_SOURCE_POSITION_BYTES
991                    );
992                }
993            }
994
995            wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
996
997            // Record sequence→source_position mapping for confirmed-position lookups.
998            if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
999                self.sequence_position_map
1000                    .write()
1001                    .await
1002                    .insert(seq, pos.clone());
1003            }
1004
1005            debug!("[{}] Dispatching event (batch): {:?}", self.id, &wrapper);
1006
1007            let arc_wrapper = Arc::new(wrapper);
1008            let mut cleared_indices: Vec<usize> = Vec::new();
1009            let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
1010
1011            for (idx, dispatcher) in dispatchers.iter().enumerate() {
1012                // Check per-subscriber position high-water mark
1013                if let Some(ref cmp) = *comparator {
1014                    let resume_positions = self.subscriber_resume_positions.read().await;
1015                    if let Some(resume_pos) = resume_positions.get(&idx) {
1016                        if let Some(ref event_pos) = arc_wrapper.source_position {
1017                            if !cmp.position_reached(event_pos, resume_pos) {
1018                                debug!(
1019                                    "[{}] Position filter: SKIPPING event for dispatcher {} \
1020                                     (event_pos={:?} <= resume_pos={:?})",
1021                                    self.id,
1022                                    idx,
1023                                    event_pos.as_ref(),
1024                                    resume_pos.as_ref()
1025                                );
1026                                continue;
1027                            }
1028                            debug!(
1029                                "[{}] Position filter: PASSING event for dispatcher {} \
1030                                 (event_pos={:?} > resume_pos={:?})",
1031                                self.id,
1032                                idx,
1033                                event_pos.as_ref(),
1034                                resume_pos.as_ref()
1035                            );
1036                            cleared_indices.push(idx);
1037                        }
1038                    } else {
1039                        debug!(
1040                            "[{}] Position filter: NO resume position for dispatcher {}, passing through",
1041                            self.id, idx
1042                        );
1043                    }
1044                }
1045
1046                if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1047                    debug!("[{}] Failed to dispatch event: {}", self.id, e);
1048                } else if let Some(ref event_pos) = arc_wrapper.source_position {
1049                    hwm_updates.push((idx, event_pos.clone()));
1050                }
1051            }
1052
1053            // Advance high-water marks for dispatchers that received this event
1054            if !hwm_updates.is_empty() {
1055                let mut resume_positions = self.subscriber_resume_positions.write().await;
1056                for (idx, pos) in hwm_updates {
1057                    resume_positions.insert(idx, pos);
1058                }
1059            }
1060        }
1061
1062        drop(comparator);
1063        drop(dispatchers);
1064
1065        Ok(())
1066    }
1067
1068    /// Broadcast SourceControl events
1069    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
1070        let wrapper = SourceEventWrapper::new(
1071            self.id.clone(),
1072            SourceEvent::Control(control),
1073            chrono::Utc::now(),
1074        );
1075        self.dispatch_event(wrapper).await
1076    }
1077
1078    /// Create a test subscription to this source (synchronous, fallible)
1079    ///
1080    /// This method is intended for use in tests to receive events from the source.
1081    /// It properly handles both Broadcast and Channel dispatch modes by delegating
1082    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
1083    ///
1084    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
1085    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
1086    ///
1087    /// # Returns
1088    /// A receiver that will receive all events dispatched by this source,
1089    /// or an error if the receiver cannot be created.
1090    pub fn try_test_subscribe(
1091        &self,
1092    ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
1093        tokio::task::block_in_place(|| {
1094            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
1095        })
1096    }
1097
1098    /// Create a test subscription to this source (synchronous wrapper)
1099    ///
1100    /// Convenience wrapper around [`try_test_subscribe`](Self::try_test_subscribe)
1101    /// that panics on failure. Prefer `try_test_subscribe()` in new code.
1102    ///
1103    /// # Panics
1104    /// Panics if the receiver cannot be created.
1105    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
1106        self.try_test_subscribe()
1107            .expect("Failed to create test subscription receiver")
1108    }
1109
1110    /// Helper function to dispatch events from spawned tasks (unstamped).
1111    ///
1112    /// This is a static helper that can be used from spawned async tasks that don't
1113    /// have access to `self`. It manually iterates through dispatchers and sends the event.
1114    ///
1115    /// **Important**: This method does NOT stamp a monotonic sequence number and
1116    /// does NOT validate `source_position` size. Events dispatched through this
1117    /// method will not be checkpoint-tracked. This is acceptable for sources that
1118    /// do not support replay (`supports_replay() == false`).
1119    ///
1120    /// # For recoverable/checkpointed sources
1121    ///
1122    /// Use [`clone_shared()`](Self::clone_shared) to obtain a `SourceBase` that
1123    /// can be moved into spawned tasks, then call [`dispatch_event()`](Self::dispatch_event)
1124    /// which stamps sequences and validates positions:
1125    ///
1126    /// ```ignore
1127    /// let base = self.base.clone_shared();
1128    /// tokio::spawn(async move {
1129    ///     base.dispatch_event(wrapper).await.ok();
1130    /// });
1131    /// ```
1132    ///
1133    /// # Arguments
1134    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
1135    /// * `wrapper` - The event wrapper to dispatch
1136    /// * `source_id` - Source ID for logging
1137    pub async fn dispatch_from_task(
1138        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
1139        wrapper: SourceEventWrapper,
1140        source_id: &str,
1141    ) -> Result<()> {
1142        debug!(
1143            "[{}] Dispatching event from task: {:?}",
1144            source_id, &wrapper
1145        );
1146
1147        // Arc-wrap for zero-copy sharing across dispatchers
1148        let arc_wrapper = Arc::new(wrapper);
1149
1150        // Send to all dispatchers
1151        let dispatchers_guard = dispatchers.read().await;
1152        for dispatcher in dispatchers_guard.iter() {
1153            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1154                debug!("[{source_id}] Failed to dispatch event from task: {e}");
1155            }
1156        }
1157
1158        Ok(())
1159    }
1160
1161    /// Handle common stop functionality
1162    pub async fn stop_common(&self) -> Result<()> {
1163        info!("Stopping source '{}'", self.id);
1164
1165        // Send shutdown signal if we have one
1166        if let Some(tx) = self.shutdown_tx.write().await.take() {
1167            let _ = tx.send(());
1168        }
1169
1170        // Wait for task to complete
1171        if let Some(mut handle) = self.task_handle.write().await.take() {
1172            match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
1173                Ok(Ok(())) => {
1174                    info!("Source '{}' task completed successfully", self.id);
1175                }
1176                Ok(Err(e)) => {
1177                    error!("Source '{}' task panicked: {}", self.id, e);
1178                }
1179                Err(_) => {
1180                    warn!(
1181                        "Source '{}' task did not complete within timeout, aborting",
1182                        self.id
1183                    );
1184                    handle.abort();
1185                }
1186            }
1187        }
1188
1189        // Clear stale dispatchers so that a subsequent start()+subscribe()
1190        // cycle does not race: without this, the CDC polling loop could
1191        // dispatch events to the old (dead) channel receivers before the
1192        // new subscribe() call creates fresh dispatchers, silently dropping
1193        // events while still advancing the checkpoint LSN.
1194        //
1195        // Broadcast mode keeps a single persistent dispatcher that hands
1196        // out receivers; channel mode creates one dispatcher per subscriber.
1197        if self.dispatch_mode == DispatchMode::Channel {
1198            let mut dispatchers = self.dispatchers.write().await;
1199            dispatchers.clear();
1200        }
1201
1202        self.set_status(
1203            ComponentStatus::Stopped,
1204            Some(format!("Source '{}' stopped", self.id)),
1205        )
1206        .await;
1207        info!("Source '{}' stopped", self.id);
1208        Ok(())
1209    }
1210
1211    /// Clear stale dispatchers from a prior lifecycle.
1212    ///
1213    /// Sources that manage their own stop/start lifecycle (instead of using
1214    /// `stop_common()`) **must** call this at the end of their `stop()`
1215    /// implementation. Without this, a subsequent `start()` + `subscribe()`
1216    /// cycle can race: the polling loop dispatches events to the old (dead)
1217    /// channel receivers before `subscribe()` creates fresh dispatchers,
1218    /// silently dropping events while still advancing the checkpoint.
1219    ///
1220    /// Only channel-mode dispatchers are cleared — broadcast mode keeps a
1221    /// single persistent dispatcher.
1222    pub async fn clear_dispatchers(&self) {
1223        if self.dispatch_mode == DispatchMode::Channel {
1224            let mut dispatchers = self.dispatchers.write().await;
1225            dispatchers.clear();
1226        }
1227    }
1228
1229    /// Clear the source's state store partition.
1230    ///
1231    /// This is called during deprovision to remove all persisted state
1232    /// associated with this source. Sources that override `deprovision()`
1233    /// can call this to clean up their state store.
1234    pub async fn deprovision_common(&self) -> Result<()> {
1235        info!("Deprovisioning source '{}'", self.id);
1236        if let Some(store) = self.state_store().await {
1237            let count = store.clear_store(&self.id).await.map_err(|e| {
1238                anyhow::anyhow!(
1239                    "Failed to clear state store for source '{}': {}",
1240                    self.id,
1241                    e
1242                )
1243            })?;
1244            info!(
1245                "Cleared {} keys from state store for source '{}'",
1246                count, self.id
1247            );
1248        }
1249        Ok(())
1250    }
1251
1252    /// Get the current status.
1253    pub async fn get_status(&self) -> ComponentStatus {
1254        self.status_handle.get_status().await
1255    }
1256
1257    /// Set the component's status — updates local state AND notifies the graph.
1258    ///
1259    /// This is the single canonical way to change a source's status.
1260    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
1261        self.status_handle.set_status(status, message).await;
1262    }
1263
1264    /// Set the task handle
1265    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
1266        *self.task_handle.write().await = Some(handle);
1267    }
1268
1269    /// Set the shutdown sender
1270    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
1271        *self.shutdown_tx.write().await = Some(tx);
1272    }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277    use super::*;
1278    use crate::sources::ByteLexPositionComparator;
1279
1280    // =========================================================================
1281    // SourceBaseParams tests
1282    // =========================================================================
1283
1284    #[test]
1285    fn test_params_new_defaults() {
1286        let params = SourceBaseParams::new("test-source");
1287        assert_eq!(params.id, "test-source");
1288        assert!(params.dispatch_mode.is_none());
1289        assert!(params.dispatch_buffer_capacity.is_none());
1290        assert!(params.bootstrap_provider.is_none());
1291        assert!(params.auto_start);
1292    }
1293
1294    #[test]
1295    fn test_params_with_dispatch_mode() {
1296        let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
1297        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1298    }
1299
1300    #[test]
1301    fn test_params_with_dispatch_buffer_capacity() {
1302        let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
1303        assert_eq!(params.dispatch_buffer_capacity, Some(50000));
1304    }
1305
1306    #[test]
1307    fn test_params_with_auto_start_false() {
1308        let params = SourceBaseParams::new("s1").with_auto_start(false);
1309        assert!(!params.auto_start);
1310    }
1311
1312    #[test]
1313    fn test_params_builder_chaining() {
1314        let params = SourceBaseParams::new("chained")
1315            .with_dispatch_mode(DispatchMode::Broadcast)
1316            .with_dispatch_buffer_capacity(2000)
1317            .with_auto_start(false);
1318
1319        assert_eq!(params.id, "chained");
1320        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1321        assert_eq!(params.dispatch_buffer_capacity, Some(2000));
1322        assert!(!params.auto_start);
1323    }
1324
1325    // =========================================================================
1326    // SourceBase tests
1327    // =========================================================================
1328
1329    #[tokio::test]
1330    async fn test_new_defaults() {
1331        let params = SourceBaseParams::new("my-source");
1332        let base = SourceBase::new(params).unwrap();
1333
1334        assert_eq!(base.id, "my-source");
1335        assert!(base.auto_start);
1336        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1337    }
1338
1339    #[tokio::test]
1340    async fn test_get_id() {
1341        let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
1342        assert_eq!(base.get_id(), "id-check");
1343    }
1344
1345    #[tokio::test]
1346    async fn test_get_auto_start() {
1347        let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
1348        assert!(base_default.get_auto_start());
1349
1350        let base_false =
1351            SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
1352        assert!(!base_false.get_auto_start());
1353    }
1354
1355    #[tokio::test]
1356    async fn test_get_status_initial() {
1357        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1358        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1359    }
1360
1361    #[tokio::test]
1362    async fn test_set_status() {
1363        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1364
1365        base.set_status(ComponentStatus::Running, None).await;
1366        assert_eq!(base.get_status().await, ComponentStatus::Running);
1367
1368        base.set_status(ComponentStatus::Error, Some("oops".into()))
1369            .await;
1370        assert_eq!(base.get_status().await, ComponentStatus::Error);
1371    }
1372
1373    #[tokio::test]
1374    async fn test_status_handle_returns_handle() {
1375        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1376        let handle = base.status_handle();
1377
1378        // The handle should reflect the same status as the base
1379        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
1380
1381        // Mutating through the handle should be visible via SourceBase
1382        handle.set_status(ComponentStatus::Starting, None).await;
1383        assert_eq!(base.get_status().await, ComponentStatus::Starting);
1384    }
1385
1386    // =========================================================================
1387    // Position handle and resume_from tests (issue #366)
1388    // =========================================================================
1389
1390    use crate::bootstrap::{
1391        BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
1392    };
1393    use crate::channels::BootstrapEventSender;
1394    use async_trait::async_trait;
1395
1396    fn make_settings(
1397        query_id: &str,
1398        enable_bootstrap: bool,
1399        resume_from: Option<bytes::Bytes>,
1400        request_position_handle: bool,
1401    ) -> crate::config::SourceSubscriptionSettings {
1402        use std::collections::HashSet;
1403        crate::config::SourceSubscriptionSettings {
1404            source_id: "test-src".to_string(),
1405            enable_bootstrap,
1406            query_id: query_id.to_string(),
1407            nodes: HashSet::new(),
1408            relations: HashSet::new(),
1409            resume_from,
1410            request_position_handle,
1411            last_sequence: None,
1412        }
1413    }
1414
1415    /// Minimal bootstrap provider that closes its sender immediately.
1416    /// Enough to verify a `bootstrap_receiver` was created without sending data.
1417    struct NoopProvider;
1418
1419    #[async_trait]
1420    impl BootstrapProvider for NoopProvider {
1421        async fn bootstrap(
1422            &self,
1423            _request: BootstrapRequest,
1424            _context: &BootstrapContext,
1425            _event_tx: BootstrapEventSender,
1426            _settings: Option<&crate::config::SourceSubscriptionSettings>,
1427        ) -> Result<BootstrapResult> {
1428            Ok(BootstrapResult::default())
1429        }
1430    }
1431
1432    fn make_base_with_bootstrap(id: &str) -> SourceBase {
1433        let mut params = SourceBaseParams::new(id);
1434        params.bootstrap_provider = Some(Box::new(NoopProvider));
1435        SourceBase::new(params).unwrap()
1436    }
1437
1438    #[tokio::test]
1439    async fn test_create_position_handle_initializes_to_u64_max() {
1440        let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
1441        let handle = base.create_position_handle("q1").await;
1442        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1443    }
1444
1445    #[tokio::test]
1446    async fn test_create_position_handle_idempotent_for_same_query() {
1447        let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
1448        let h1 = base.create_position_handle("q1").await;
1449        h1.store(123, Ordering::Relaxed);
1450        let h2 = base.create_position_handle("q1").await;
1451        // Same Arc — preserves the previously reported position.
1452        assert!(Arc::ptr_eq(&h1, &h2));
1453        assert_eq!(h2.load(Ordering::Relaxed), 123);
1454    }
1455
1456    #[tokio::test]
1457    async fn test_remove_position_handle_drops_entry() {
1458        let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
1459        let handle = base.create_position_handle("q1").await;
1460        handle.store(42, Ordering::Relaxed);
1461        assert_eq!(base.compute_confirmed_position().await, Some(42));
1462        base.remove_position_handle("q1").await;
1463        assert_eq!(base.compute_confirmed_position().await, None);
1464    }
1465
1466    #[tokio::test]
1467    async fn test_remove_position_handle_noop_when_absent() {
1468        let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
1469        // Must not panic.
1470        base.remove_position_handle("never-registered").await;
1471        assert_eq!(base.compute_confirmed_position().await, None);
1472    }
1473
1474    #[tokio::test]
1475    async fn test_compute_confirmed_position_returns_none_when_empty() {
1476        let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1477        assert_eq!(base.compute_confirmed_position().await, None);
1478    }
1479
1480    #[tokio::test]
1481    async fn test_compute_confirmed_position_returns_none_when_all_max() {
1482        let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1483        let _h1 = base.create_position_handle("q1").await;
1484        let _h2 = base.create_position_handle("q2").await;
1485        assert_eq!(base.compute_confirmed_position().await, None);
1486    }
1487
1488    #[tokio::test]
1489    async fn test_compute_confirmed_position_filters_max_returns_min() {
1490        let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1491        let h1 = base.create_position_handle("q1").await;
1492        let _h2 = base.create_position_handle("q2").await; // stays u64::MAX
1493        let h3 = base.create_position_handle("q3").await;
1494        h1.store(100, Ordering::Relaxed);
1495        h3.store(50, Ordering::Relaxed);
1496        assert_eq!(base.compute_confirmed_position().await, Some(50));
1497    }
1498
1499    #[tokio::test]
1500    async fn test_compute_confirmed_position_single_real_value() {
1501        let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1502        let h1 = base.create_position_handle("q1").await;
1503        let _h2 = base.create_position_handle("q2").await;
1504        h1.store(7, Ordering::Relaxed);
1505        assert_eq!(base.compute_confirmed_position().await, Some(7));
1506    }
1507
1508    #[tokio::test]
1509    async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1510        let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1511        {
1512            let handle = base.create_position_handle("q1").await;
1513            handle.store(99, Ordering::Relaxed);
1514            // handle (the external clone) drops here.
1515        }
1516        base.cleanup_stale_handles().await;
1517        assert_eq!(base.compute_confirmed_position().await, None);
1518    }
1519
1520    #[tokio::test]
1521    async fn test_cleanup_stale_handles_keeps_held_arc() {
1522        let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1523        let handle = base.create_position_handle("q1").await;
1524        handle.store(11, Ordering::Relaxed);
1525        base.cleanup_stale_handles().await;
1526        // External clone still alive, entry retained.
1527        assert_eq!(base.compute_confirmed_position().await, Some(11));
1528        // Keep `handle` alive past the assertion.
1529        drop(handle);
1530    }
1531
1532    #[tokio::test]
1533    async fn test_subscribe_with_request_position_handle_returns_handle() {
1534        let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1535        let response = base
1536            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1537            .await
1538            .unwrap();
1539        let handle = response.position_handle.expect("expected handle");
1540        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1541        // Internal map should also have the entry (so min-watermark sees it).
1542        assert_eq!(base.compute_confirmed_position().await, None); // u64::MAX → None
1543    }
1544
1545    #[tokio::test]
1546    async fn test_subscribe_without_request_position_handle_returns_none() {
1547        let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1548        let response = base
1549            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1550            .await
1551            .unwrap();
1552        assert!(response.position_handle.is_none());
1553        // Internal map must be empty so this volatile query is excluded from
1554        // the min-watermark.
1555        let handles = base.position_handles.read().await;
1556        assert!(handles.is_empty());
1557    }
1558
1559    #[tokio::test]
1560    async fn test_subscribe_returned_handle_shared_with_internal() {
1561        let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1562        let response = base
1563            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1564            .await
1565            .unwrap();
1566        let handle = response.position_handle.unwrap();
1567        handle.store(42, Ordering::Relaxed);
1568        assert_eq!(base.compute_confirmed_position().await, Some(42));
1569    }
1570
1571    #[tokio::test]
1572    async fn test_subscribe_with_resume_from_skips_bootstrap() {
1573        let base = make_base_with_bootstrap("sub-resume");
1574        let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1575        let response = base
1576            .subscribe_with_bootstrap(&make_settings("q1", true, Some(position), false), "test")
1577            .await
1578            .unwrap();
1579        assert!(
1580            response.bootstrap_receiver.is_none(),
1581            "resume_from must override enable_bootstrap"
1582        );
1583    }
1584
1585    #[tokio::test]
1586    async fn test_subscribe_resume_without_bootstrap_still_none() {
1587        let base = make_base_with_bootstrap("sub-resume-no-bs");
1588        let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1589        let response = base
1590            .subscribe_with_bootstrap(&make_settings("q1", false, Some(position), false), "test")
1591            .await
1592            .unwrap();
1593        assert!(response.bootstrap_receiver.is_none());
1594    }
1595
1596    #[tokio::test]
1597    async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1598        let base = make_base_with_bootstrap("sub-bs");
1599        let response = base
1600            .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1601            .await
1602            .unwrap();
1603        assert!(
1604            response.bootstrap_receiver.is_some(),
1605            "regression guard: bootstrap path must still produce a receiver"
1606        );
1607    }
1608
1609    #[tokio::test]
1610    async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1611        let base = make_base_with_bootstrap("sub-neither");
1612        let response = base
1613            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1614            .await
1615            .unwrap();
1616        assert!(response.bootstrap_receiver.is_none());
1617        assert!(response.position_handle.is_none());
1618    }
1619
1620    // =========================================================================
1621    // dispatch_events_batch tests
1622    // =========================================================================
1623
1624    fn make_event(source_id: &str, position: Option<&[u8]>) -> SourceEventWrapper {
1625        let change = drasi_core::models::SourceChange::Insert {
1626            element: drasi_core::models::Element::Node {
1627                metadata: drasi_core::models::ElementMetadata {
1628                    reference: drasi_core::models::ElementReference::new(source_id, "n1"),
1629                    labels: Arc::from([Arc::from("Label")]),
1630                    effective_from: 0,
1631                },
1632                properties: drasi_core::models::ElementPropertyMap::new(),
1633            },
1634        };
1635        let mut wrapper = SourceEventWrapper::new(
1636            source_id.to_string(),
1637            SourceEvent::Change(change),
1638            chrono::Utc::now(),
1639        );
1640        wrapper.source_position = position.map(|p| bytes::Bytes::from(p.to_vec()));
1641        wrapper
1642    }
1643
1644    #[tokio::test]
1645    async fn test_dispatch_events_batch_empty_returns_ok() {
1646        let base = SourceBase::new(SourceBaseParams::new("batch-empty")).unwrap();
1647        let result = base.dispatch_events_batch(Vec::new()).await;
1648        assert!(result.is_ok());
1649    }
1650
1651    #[tokio::test]
1652    async fn test_dispatch_events_batch_stamps_monotonic_sequences() {
1653        let params = SourceBaseParams::new("batch-seq").with_dispatch_mode(DispatchMode::Channel);
1654        let base = SourceBase::new(params).unwrap();
1655
1656        // Create a receiver so events are actually captured
1657        let mut receiver = base.create_streaming_receiver().await.unwrap();
1658
1659        let events = vec![
1660            make_event("batch-seq", Some(b"\x01")),
1661            make_event("batch-seq", Some(b"\x02")),
1662            make_event("batch-seq", Some(b"\x03")),
1663        ];
1664
1665        base.dispatch_events_batch(events).await.unwrap();
1666
1667        let e1 = receiver.recv().await.unwrap();
1668        let e2 = receiver.recv().await.unwrap();
1669        let e3 = receiver.recv().await.unwrap();
1670
1671        let s1 = e1.sequence.expect("event 1 must have sequence");
1672        let s2 = e2.sequence.expect("event 2 must have sequence");
1673        let s3 = e3.sequence.expect("event 3 must have sequence");
1674
1675        assert_eq!(s2, s1 + 1, "sequences must be monotonically increasing");
1676        assert_eq!(s3, s2 + 1, "sequences must be monotonically increasing");
1677    }
1678
1679    #[tokio::test]
1680    async fn test_dispatch_events_batch_multi_dispatcher_fanout() {
1681        let params =
1682            SourceBaseParams::new("batch-fanout").with_dispatch_mode(DispatchMode::Channel);
1683        let base = SourceBase::new(params).unwrap();
1684
1685        // Create two receivers (two dispatchers in channel mode)
1686        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1687        let mut rx2 = base.create_streaming_receiver().await.unwrap();
1688
1689        let events = vec![
1690            make_event("batch-fanout", Some(b"\x01")),
1691            make_event("batch-fanout", Some(b"\x02")),
1692        ];
1693
1694        base.dispatch_events_batch(events).await.unwrap();
1695
1696        // Both receivers should get both events
1697        let r1_e1 = rx1.recv().await.unwrap();
1698        let r1_e2 = rx1.recv().await.unwrap();
1699        let r2_e1 = rx2.recv().await.unwrap();
1700        let r2_e2 = rx2.recv().await.unwrap();
1701
1702        // Same sequences across both receivers
1703        assert_eq!(r1_e1.sequence, r2_e1.sequence);
1704        assert_eq!(r1_e2.sequence, r2_e2.sequence);
1705    }
1706
1707    #[tokio::test]
1708    async fn test_dispatch_events_batch_oversized_position_still_dispatches() {
1709        let params =
1710            SourceBaseParams::new("batch-oversize").with_dispatch_mode(DispatchMode::Channel);
1711        let base = SourceBase::new(params).unwrap();
1712        let mut rx = base.create_streaming_receiver().await.unwrap();
1713
1714        // Create an event with a position larger than MAX_SOURCE_POSITION_BYTES
1715        let big_pos = vec![0xAA; SourceBase::MAX_SOURCE_POSITION_BYTES + 1];
1716        let events = vec![make_event("batch-oversize", Some(&big_pos))];
1717
1718        // Should succeed (warn but not error)
1719        base.dispatch_events_batch(events).await.unwrap();
1720
1721        let received = rx.recv().await.unwrap();
1722        assert!(received.sequence.is_some(), "event must still be stamped");
1723        assert_eq!(
1724            received.source_position.as_ref().map(|p| p.len()),
1725            Some(SourceBase::MAX_SOURCE_POSITION_BYTES + 1),
1726            "oversized position must still be delivered (checkpoint layer enforces the limit)"
1727        );
1728    }
1729
1730    // =========================================================================
1731    // Per-subscriber position filtering tests
1732    // =========================================================================
1733
1734    #[tokio::test]
1735    async fn test_position_filter_suppresses_events_before_resume() {
1736        let params = SourceBaseParams::new("pos-filter").with_dispatch_mode(DispatchMode::Channel);
1737        let base = SourceBase::new(params).unwrap();
1738        base.set_position_comparator(ByteLexPositionComparator)
1739            .await;
1740
1741        // Create two subscribers
1742        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1743        let mut rx2 = base.create_streaming_receiver().await.unwrap();
1744
1745        // rx1 (dispatcher 0): resume_from = position [0x00, 0x05]
1746        // rx2 (dispatcher 1): no resume position (gets everything)
1747        base.subscriber_resume_positions
1748            .write()
1749            .await
1750            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1751
1752        // Dispatch event at position [0x00, 0x03] — before rx1's resume
1753        let event = make_event("pos-filter", Some(&[0x00, 0x03]));
1754        base.dispatch_event(event).await.unwrap();
1755
1756        // rx2 should receive it (no filter)
1757        let r2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1758            .await
1759            .unwrap()
1760            .unwrap();
1761        assert_eq!(r2.source_position.as_ref().unwrap().as_ref(), &[0x00, 0x03]);
1762
1763        // rx1 should NOT receive it (position not reached)
1764        let r1 = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
1765        assert!(r1.is_err(), "rx1 should timeout — event was suppressed");
1766    }
1767
1768    #[tokio::test]
1769    async fn test_position_filter_delivers_events_past_resume() {
1770        let params = SourceBaseParams::new("pos-filter2").with_dispatch_mode(DispatchMode::Channel);
1771        let base = SourceBase::new(params).unwrap();
1772        base.set_position_comparator(ByteLexPositionComparator)
1773            .await;
1774
1775        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1776
1777        // resume_from = [0x00, 0x05]
1778        base.subscriber_resume_positions
1779            .write()
1780            .await
1781            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1782
1783        // Event at [0x00, 0x06] — past resume
1784        let event = make_event("pos-filter2", Some(&[0x00, 0x06]));
1785        base.dispatch_event(event).await.unwrap();
1786
1787        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1788            .await
1789            .unwrap()
1790            .unwrap();
1791        assert_eq!(
1792            received.source_position.as_ref().unwrap().as_ref(),
1793            &[0x00, 0x06]
1794        );
1795    }
1796
1797    #[tokio::test]
1798    async fn test_position_filter_advances_high_water_mark() {
1799        let params = SourceBaseParams::new("pos-hwm").with_dispatch_mode(DispatchMode::Channel);
1800        let base = SourceBase::new(params).unwrap();
1801        base.set_position_comparator(ByteLexPositionComparator)
1802            .await;
1803
1804        let mut rx = base.create_streaming_receiver().await.unwrap();
1805
1806        // resume_from = [0x00, 0x03]
1807        base.subscriber_resume_positions
1808            .write()
1809            .await
1810            .insert(0, Bytes::from_static(&[0x00, 0x03]));
1811
1812        // First event at [0x00, 0x04] — past resume, advances high-water mark
1813        base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x04])))
1814            .await
1815            .unwrap();
1816        let _ = rx.recv().await.unwrap();
1817
1818        // High-water mark should be updated (not cleared)
1819        {
1820            let positions = base.subscriber_resume_positions.read().await;
1821            assert_eq!(
1822                positions.get(&0).map(|b| b.as_ref()),
1823                Some([0x00, 0x04].as_slice()),
1824                "high-water mark should be advanced to dispatched position"
1825            );
1826        }
1827
1828        // Subsequent event at LOWER position should be suppressed (rewind protection)
1829        base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x01])))
1830            .await
1831            .unwrap();
1832        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1833        assert!(
1834            r.is_err(),
1835            "event below high-water mark should be suppressed after rewind"
1836        );
1837
1838        // Event at HIGHER position should flow through
1839        base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x06])))
1840            .await
1841            .unwrap();
1842        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1843            .await
1844            .unwrap()
1845            .unwrap();
1846        assert_eq!(
1847            received.source_position.as_ref().unwrap().as_ref(),
1848            &[0x00, 0x06]
1849        );
1850    }
1851
1852    #[tokio::test]
1853    async fn test_position_filter_equal_position_is_suppressed() {
1854        // resume_from is the LAST committed position, so an event at exactly
1855        // that position has already been processed — it should be suppressed.
1856        let params = SourceBaseParams::new("pos-equal").with_dispatch_mode(DispatchMode::Channel);
1857        let base = SourceBase::new(params).unwrap();
1858        base.set_position_comparator(ByteLexPositionComparator)
1859            .await;
1860
1861        let mut rx = base.create_streaming_receiver().await.unwrap();
1862
1863        base.subscriber_resume_positions
1864            .write()
1865            .await
1866            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1867
1868        // Event at exactly [0x00, 0x05] — should be suppressed
1869        base.dispatch_event(make_event("pos-equal", Some(&[0x00, 0x05])))
1870            .await
1871            .unwrap();
1872
1873        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1874        assert!(
1875            r.is_err(),
1876            "event at exactly resume position should be suppressed"
1877        );
1878    }
1879
1880    #[tokio::test]
1881    async fn test_position_filter_no_comparator_delivers_all() {
1882        // Without a position comparator set, all events should be delivered
1883        // even if there's a resume position entry.
1884        let params = SourceBaseParams::new("no-cmp").with_dispatch_mode(DispatchMode::Channel);
1885        let base = SourceBase::new(params).unwrap();
1886        // Deliberately NOT setting a position comparator
1887
1888        let mut rx = base.create_streaming_receiver().await.unwrap();
1889
1890        base.subscriber_resume_positions
1891            .write()
1892            .await
1893            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1894
1895        // Event at [0x00, 0x03] — normally suppressed, but no comparator
1896        base.dispatch_event(make_event("no-cmp", Some(&[0x00, 0x03])))
1897            .await
1898            .unwrap();
1899
1900        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1901            .await
1902            .unwrap()
1903            .unwrap();
1904        assert_eq!(
1905            received.source_position.as_ref().unwrap().as_ref(),
1906            &[0x00, 0x03]
1907        );
1908    }
1909
1910    #[tokio::test]
1911    async fn test_position_filter_batch_mode() {
1912        let params = SourceBaseParams::new("pos-batch").with_dispatch_mode(DispatchMode::Channel);
1913        let base = SourceBase::new(params).unwrap();
1914        base.set_position_comparator(ByteLexPositionComparator)
1915            .await;
1916
1917        let mut rx1 = base.create_streaming_receiver().await.unwrap();
1918        let mut rx2 = base.create_streaming_receiver().await.unwrap();
1919
1920        // rx1 (idx 0): resume_from = [0x00, 0x05]
1921        // rx2 (idx 1): resume_from = [0x00, 0x02]
1922        {
1923            let mut positions = base.subscriber_resume_positions.write().await;
1924            positions.insert(0, Bytes::from_static(&[0x00, 0x05]));
1925            positions.insert(1, Bytes::from_static(&[0x00, 0x02]));
1926        }
1927
1928        let events = vec![
1929            make_event("pos-batch", Some(&[0x00, 0x01])), // before both
1930            make_event("pos-batch", Some(&[0x00, 0x03])), // past rx2, before rx1
1931            make_event("pos-batch", Some(&[0x00, 0x06])), // past both
1932        ];
1933        base.dispatch_events_batch(events).await.unwrap();
1934
1935        // rx2 should receive events at [0x03] and [0x06] (skipping [0x01])
1936        let r2_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1937            .await
1938            .unwrap()
1939            .unwrap();
1940        assert_eq!(
1941            r2_1.source_position.as_ref().unwrap().as_ref(),
1942            &[0x00, 0x03]
1943        );
1944
1945        let r2_2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1946            .await
1947            .unwrap()
1948            .unwrap();
1949        assert_eq!(
1950            r2_2.source_position.as_ref().unwrap().as_ref(),
1951            &[0x00, 0x06]
1952        );
1953
1954        // rx1 should only receive event at [0x06] (skipping [0x01] and [0x03])
1955        let r1_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1956            .await
1957            .unwrap()
1958            .unwrap();
1959        assert_eq!(
1960            r1_1.source_position.as_ref().unwrap().as_ref(),
1961            &[0x00, 0x06]
1962        );
1963
1964        // No more events for rx1
1965        let r1_extra = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
1966        assert!(r1_extra.is_err(), "rx1 should have no more events");
1967    }
1968
1969    #[tokio::test]
1970    async fn test_position_filter_events_without_position_delivered() {
1971        // Events with no source_position cannot be filtered — they pass through.
1972        let params = SourceBaseParams::new("pos-none").with_dispatch_mode(DispatchMode::Channel);
1973        let base = SourceBase::new(params).unwrap();
1974        base.set_position_comparator(ByteLexPositionComparator)
1975            .await;
1976
1977        let mut rx = base.create_streaming_receiver().await.unwrap();
1978
1979        base.subscriber_resume_positions
1980            .write()
1981            .await
1982            .insert(0, Bytes::from_static(&[0x00, 0x05]));
1983
1984        // Event with no source_position
1985        base.dispatch_event(make_event("pos-none", None))
1986            .await
1987            .unwrap();
1988
1989        let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1990            .await
1991            .unwrap()
1992            .unwrap();
1993        assert!(received.source_position.is_none());
1994    }
1995
1996    // =========================================================================
1997    // Sequence → source position mapping tests
1998    // =========================================================================
1999
2000    #[tokio::test]
2001    async fn test_sequence_position_map_populated_on_dispatch() {
2002        let params = SourceBaseParams::new("spm-1").with_dispatch_mode(DispatchMode::Channel);
2003        let base = SourceBase::new(params).unwrap();
2004        let _rx = base.create_streaming_receiver().await.unwrap();
2005
2006        let lsn: u64 = 0x1234;
2007        base.dispatch_event(make_event("spm-1", Some(&lsn.to_be_bytes())))
2008            .await
2009            .unwrap();
2010
2011        let map = base.sequence_position_map.read().await;
2012        assert_eq!(map.len(), 1);
2013        let (seq, pos) = map.iter().next().unwrap();
2014        assert_eq!(*seq, 1); // first sequence
2015        assert_eq!(pos.as_ref(), &lsn.to_be_bytes());
2016    }
2017
2018    #[tokio::test]
2019    async fn test_sequence_position_map_not_populated_without_position() {
2020        let params = SourceBaseParams::new("spm-none").with_dispatch_mode(DispatchMode::Channel);
2021        let base = SourceBase::new(params).unwrap();
2022        let _rx = base.create_streaming_receiver().await.unwrap();
2023
2024        base.dispatch_event(make_event("spm-none", None))
2025            .await
2026            .unwrap();
2027
2028        let map = base.sequence_position_map.read().await;
2029        assert!(map.is_empty());
2030    }
2031
2032    #[tokio::test]
2033    async fn test_compute_confirmed_source_position_basic() {
2034        let params = SourceBaseParams::new("cssp-1").with_dispatch_mode(DispatchMode::Channel);
2035        let base = SourceBase::new(params).unwrap();
2036        let _rx = base.create_streaming_receiver().await.unwrap();
2037
2038        // Create position handle and set confirmed sequence
2039        let handle = base.create_position_handle("q1").await;
2040
2041        // Dispatch 3 events with known LSNs
2042        for lsn in [100u64, 200, 300] {
2043            base.dispatch_event(make_event("cssp-1", Some(&lsn.to_be_bytes())))
2044                .await
2045                .unwrap();
2046        }
2047
2048        // Confirm up to sequence 2 (second event, LSN=200)
2049        handle.store(2, Ordering::Relaxed);
2050
2051        let confirmed = base.compute_confirmed_source_position().await;
2052        assert!(confirmed.is_some());
2053        let lsn_bytes = confirmed.unwrap();
2054        assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 200);
2055    }
2056
2057    #[tokio::test]
2058    async fn test_compute_confirmed_source_position_returns_none_when_no_handles() {
2059        let base = SourceBase::new(SourceBaseParams::new("cssp-none")).unwrap();
2060        assert!(base.compute_confirmed_source_position().await.is_none());
2061    }
2062
2063    #[tokio::test]
2064    async fn test_compute_confirmed_source_position_returns_none_when_all_max() {
2065        let base = SourceBase::new(SourceBaseParams::new("cssp-max")).unwrap();
2066        let _h = base.create_position_handle("q1").await;
2067        // h stays at u64::MAX
2068        assert!(base.compute_confirmed_source_position().await.is_none());
2069    }
2070
2071    #[tokio::test]
2072    async fn test_compute_confirmed_source_position_min_of_two_queries() {
2073        let params = SourceBaseParams::new("cssp-2q").with_dispatch_mode(DispatchMode::Channel);
2074        let base = SourceBase::new(params).unwrap();
2075        let _rx = base.create_streaming_receiver().await.unwrap();
2076
2077        let h1 = base.create_position_handle("q1").await;
2078        let h2 = base.create_position_handle("q2").await;
2079
2080        // Dispatch 3 events
2081        for lsn in [100u64, 200, 300] {
2082            base.dispatch_event(make_event("cssp-2q", Some(&lsn.to_be_bytes())))
2083                .await
2084                .unwrap();
2085        }
2086
2087        // q1 confirmed seq 3 (LSN=300), q2 confirmed seq 1 (LSN=100)
2088        h1.store(3, Ordering::Relaxed);
2089        h2.store(1, Ordering::Relaxed);
2090
2091        let confirmed = base.compute_confirmed_source_position().await;
2092        assert!(confirmed.is_some());
2093        let lsn_bytes = confirmed.unwrap();
2094        // min is seq 1 → LSN 100
2095        assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 100);
2096    }
2097
2098    #[tokio::test]
2099    async fn test_prune_position_map() {
2100        let params = SourceBaseParams::new("prune-1").with_dispatch_mode(DispatchMode::Channel);
2101        let base = SourceBase::new(params).unwrap();
2102        let _rx = base.create_streaming_receiver().await.unwrap();
2103
2104        for lsn in [10u64, 20, 30, 40, 50] {
2105            base.dispatch_event(make_event("prune-1", Some(&lsn.to_be_bytes())))
2106                .await
2107                .unwrap();
2108        }
2109        // Sequences are 1..=5
2110        assert_eq!(base.sequence_position_map.read().await.len(), 5);
2111
2112        base.prune_position_map(3).await;
2113        let map = base.sequence_position_map.read().await;
2114        assert_eq!(map.len(), 2); // sequences 4 and 5 remain
2115        assert!(map.contains_key(&4));
2116        assert!(map.contains_key(&5));
2117    }
2118
2119    #[tokio::test]
2120    async fn test_prune_position_map_all() {
2121        let params = SourceBaseParams::new("prune-all").with_dispatch_mode(DispatchMode::Channel);
2122        let base = SourceBase::new(params).unwrap();
2123        let _rx = base.create_streaming_receiver().await.unwrap();
2124
2125        for lsn in [10u64, 20] {
2126            base.dispatch_event(make_event("prune-all", Some(&lsn.to_be_bytes())))
2127                .await
2128                .unwrap();
2129        }
2130        base.prune_position_map(100).await; // prune beyond last seq
2131        assert!(base.sequence_position_map.read().await.is_empty());
2132    }
2133
2134    #[tokio::test]
2135    async fn test_position_handle_initialized_to_last_sequence() {
2136        use crate::config::SourceSubscriptionSettings;
2137
2138        let params = SourceBaseParams::new("ph-init").with_dispatch_mode(DispatchMode::Channel);
2139        let base = SourceBase::new(params).unwrap();
2140
2141        let settings = SourceSubscriptionSettings {
2142            query_id: "q1".to_string(),
2143            source_id: "ph-init".to_string(),
2144            enable_bootstrap: false,
2145            resume_from: Some(Bytes::from_static(&[0x00, 0x01])),
2146            last_sequence: Some(42),
2147            request_position_handle: true,
2148            nodes: Default::default(),
2149            relations: Default::default(),
2150        };
2151
2152        let response = base
2153            .subscribe_with_bootstrap(&settings, "test")
2154            .await
2155            .unwrap();
2156        let handle = response.position_handle.expect("should have handle");
2157        // Handle should be initialized to last_sequence, not u64::MAX
2158        assert_eq!(handle.load(Ordering::Relaxed), 42);
2159    }
2160
2161    #[tokio::test]
2162    async fn test_position_handle_stays_max_without_last_sequence() {
2163        use crate::config::SourceSubscriptionSettings;
2164
2165        let params = SourceBaseParams::new("ph-no-ls").with_dispatch_mode(DispatchMode::Channel);
2166        let base = SourceBase::new(params).unwrap();
2167
2168        let settings = SourceSubscriptionSettings {
2169            query_id: "q1".to_string(),
2170            source_id: "ph-no-ls".to_string(),
2171            enable_bootstrap: true,
2172            resume_from: None,
2173            last_sequence: None,
2174            request_position_handle: true,
2175            nodes: Default::default(),
2176            relations: Default::default(),
2177        };
2178
2179        let response = base
2180            .subscribe_with_bootstrap(&settings, "test")
2181            .await
2182            .unwrap();
2183        let handle = response.position_handle.expect("should have handle");
2184        // No last_sequence → handle stays at u64::MAX
2185        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
2186    }
2187
2188    #[tokio::test]
2189    async fn test_batch_dispatch_populates_sequence_position_map() {
2190        let params = SourceBaseParams::new("spm-batch").with_dispatch_mode(DispatchMode::Channel);
2191        let base = SourceBase::new(params).unwrap();
2192        let _rx = base.create_streaming_receiver().await.unwrap();
2193
2194        let events = vec![
2195            make_event("spm-batch", Some(&100u64.to_be_bytes())),
2196            make_event("spm-batch", Some(&200u64.to_be_bytes())),
2197            make_event("spm-batch", None), // no position
2198        ];
2199
2200        base.dispatch_events_batch(events).await.unwrap();
2201
2202        let map = base.sequence_position_map.read().await;
2203        // Only 2 entries (the one without position is skipped)
2204        assert_eq!(map.len(), 2);
2205        assert!(map.contains_key(&1));
2206        assert!(map.contains_key(&2));
2207    }
2208
2209    #[tokio::test]
2210    async fn test_position_filter_rewind_protection_multi_subscriber() {
2211        // Simulates the Postgres scenario: subscriber A joins, processes events,
2212        // then subscriber B joins and the source rewinds the stream.
2213        // Subscriber A should NOT see replayed events thanks to the
2214        // persistent high-water mark.
2215        let params = SourceBaseParams::new("rewind").with_dispatch_mode(DispatchMode::Channel);
2216        let base = SourceBase::new(params).unwrap();
2217        base.set_position_comparator(ByteLexPositionComparator)
2218            .await;
2219
2220        // Subscriber A joins with resume_from at [0x10]
2221        let mut rx_a = base.create_streaming_receiver().await.unwrap();
2222        base.subscriber_resume_positions
2223            .write()
2224            .await
2225            .insert(0, Bytes::from_static(&[0x10]));
2226
2227        // Source dispatches events at [0x20] and [0x30] — both past A's resume
2228        base.dispatch_event(make_event("rewind", Some(&[0x20])))
2229            .await
2230            .unwrap();
2231        let ev = rx_a.recv().await.unwrap();
2232        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2233
2234        base.dispatch_event(make_event("rewind", Some(&[0x30])))
2235            .await
2236            .unwrap();
2237        let ev = rx_a.recv().await.unwrap();
2238        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2239
2240        // Subscriber B joins with resume_from at [0x10]
2241        let mut rx_b = base.create_streaming_receiver().await.unwrap();
2242        base.subscriber_resume_positions
2243            .write()
2244            .await
2245            .insert(1, Bytes::from_static(&[0x10]));
2246
2247        // Source REWINDS — replays from [0x20] again
2248        // A should NOT see these (high-water is at [0x30])
2249        // B should see [0x20] (past its resume_from [0x10])
2250        base.dispatch_event(make_event("rewind", Some(&[0x20])))
2251            .await
2252            .unwrap();
2253
2254        // A should NOT receive the replayed event
2255        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2256        assert!(
2257            r.is_err(),
2258            "subscriber A should not see replayed event at [0x20]"
2259        );
2260
2261        // B SHOULD receive it (it's past B's resume_from)
2262        let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2263            .await
2264            .unwrap()
2265            .unwrap();
2266        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2267
2268        // Replay [0x30] — A should NOT see it, B should
2269        base.dispatch_event(make_event("rewind", Some(&[0x30])))
2270            .await
2271            .unwrap();
2272
2273        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2274        assert!(
2275            r.is_err(),
2276            "subscriber A should not see replayed event at [0x30]"
2277        );
2278
2279        let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2280            .await
2281            .unwrap()
2282            .unwrap();
2283        assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2284
2285        // New event [0x40] — BOTH should see it
2286        base.dispatch_event(make_event("rewind", Some(&[0x40])))
2287            .await
2288            .unwrap();
2289
2290        let ev_a = tokio::time::timeout(std::time::Duration::from_millis(100), rx_a.recv())
2291            .await
2292            .unwrap()
2293            .unwrap();
2294        assert_eq!(ev_a.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2295
2296        let ev_b = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2297            .await
2298            .unwrap()
2299            .unwrap();
2300        assert_eq!(ev_b.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2301    }
2302
2303    #[tokio::test]
2304    async fn test_high_water_mark_set_for_new_subscriber_without_resume() {
2305        // A subscriber without initial resume_from should get a high-water
2306        // mark after its first event, protecting it from future rewinds.
2307        let params = SourceBaseParams::new("hwm-new").with_dispatch_mode(DispatchMode::Channel);
2308        let base = SourceBase::new(params).unwrap();
2309        base.set_position_comparator(ByteLexPositionComparator)
2310            .await;
2311
2312        let mut rx = base.create_streaming_receiver().await.unwrap();
2313        // No resume_from set
2314
2315        // Dispatch event — should be delivered (no filter)
2316        base.dispatch_event(make_event("hwm-new", Some(&[0x10])))
2317            .await
2318            .unwrap();
2319        let _ = rx.recv().await.unwrap();
2320
2321        // Now the high-water mark should be set at [0x10]
2322        {
2323            let positions = base.subscriber_resume_positions.read().await;
2324            assert_eq!(
2325                positions.get(&0).map(|b| b.as_ref()),
2326                Some([0x10].as_slice()),
2327                "high-water mark should be set after first dispatch"
2328            );
2329        }
2330
2331        // Rewind: event at [0x05] should be suppressed
2332        base.dispatch_event(make_event("hwm-new", Some(&[0x05])))
2333            .await
2334            .unwrap();
2335        let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
2336        assert!(
2337            r.is_err(),
2338            "event below high-water mark should be suppressed"
2339        );
2340    }
2341}