Skip to main content

drasi_lib/sources/
base.rs

1// Copyright 2025 The Drasi Authors.
2//
3// Licensed under the Apache License, Version 2.0 (the "License");
4// you may not use this file except in compliance with the License.
5// You may obtain a copy of the License at
6//
7//     http://www.apache.org/licenses/LICENSE-2.0
8//
9// Unless required by applicable law or agreed to in writing, software
10// distributed under the License is distributed on an "AS IS" BASIS,
11// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12// See the License for the specific language governing permissions and
13// limitations under the License.
14
15//! Base implementation for common source functionality.
16//!
17//! This module provides `SourceBase` which encapsulates common patterns
18//! used across all source implementations:
19//! - Dispatcher setup and management
20//! - Bootstrap subscription handling
21//! - Event dispatching with profiling
22//! - Component lifecycle management
23//!
24//! # Plugin Architecture
25//!
26//! SourceBase is designed to be used by source plugins. Each plugin:
27//! 1. Defines its own typed configuration struct
28//! 2. Creates a SourceBase with SourceBaseParams
29//! 3. Optionally provides a bootstrap provider via `set_bootstrap_provider()`
30//! 4. Implements the Source trait delegating to SourceBase methods
31
32use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::RwLock;
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::state_store::StateStoreProvider;
47use drasi_core::models::SourceChange;
48
49/// Parameters for creating a SourceBase instance.
50///
51/// This struct contains only the information that SourceBase needs to function.
52/// Plugin-specific configuration should remain in the plugin crate.
53///
54/// # Example
55///
56/// ```ignore
57/// use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
58///
59/// let params = SourceBaseParams::new("my-source")
60///     .with_dispatch_mode(DispatchMode::Channel)
61///     .with_dispatch_buffer_capacity(2000)
62///     .with_bootstrap_provider(my_provider);
63///
64/// let base = SourceBase::new(params)?;
65/// ```
66pub struct SourceBaseParams {
67    /// Unique identifier for the source
68    pub id: String,
69    /// Dispatch mode (Broadcast or Channel) - defaults to Channel
70    pub dispatch_mode: Option<DispatchMode>,
71    /// Dispatch buffer capacity - defaults to 1000
72    pub dispatch_buffer_capacity: Option<usize>,
73    /// Optional bootstrap provider to set during construction
74    pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
75    /// Whether this source should auto-start - defaults to true
76    pub auto_start: bool,
77}
78
79impl std::fmt::Debug for SourceBaseParams {
80    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81        f.debug_struct("SourceBaseParams")
82            .field("id", &self.id)
83            .field("dispatch_mode", &self.dispatch_mode)
84            .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
85            .field(
86                "bootstrap_provider",
87                &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
88            )
89            .field("auto_start", &self.auto_start)
90            .finish()
91    }
92}
93
94impl SourceBaseParams {
95    /// Create new params with just an ID, using defaults for everything else
96    pub fn new(id: impl Into<String>) -> Self {
97        Self {
98            id: id.into(),
99            dispatch_mode: None,
100            dispatch_buffer_capacity: None,
101            bootstrap_provider: None,
102            auto_start: true,
103        }
104    }
105
106    /// Set the dispatch mode
107    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
108        self.dispatch_mode = Some(mode);
109        self
110    }
111
112    /// Set the dispatch buffer capacity
113    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
114        self.dispatch_buffer_capacity = Some(capacity);
115        self
116    }
117
118    /// Set the bootstrap provider
119    ///
120    /// This provider will be used during source subscription to deliver
121    /// initial data to queries that request bootstrap.
122    pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
123        self.bootstrap_provider = Some(Box::new(provider));
124        self
125    }
126
127    /// Set whether this source should auto-start
128    ///
129    /// Default is `true`. Set to `false` if this source should only be
130    /// started manually via `start_source()`.
131    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
132        self.auto_start = auto_start;
133        self
134    }
135}
136
137/// Base implementation for common source functionality
138pub struct SourceBase {
139    /// Source identifier
140    pub id: String,
141    /// Dispatch mode setting
142    dispatch_mode: DispatchMode,
143    /// Dispatch buffer capacity
144    dispatch_buffer_capacity: usize,
145    /// Whether this source should auto-start
146    pub auto_start: bool,
147    /// Component status handle — always available, wired to graph during initialize().
148    status_handle: ComponentStatusHandle,
149    /// Dispatchers for sending source events to subscribers
150    ///
151    /// This is a vector of dispatchers that send source events to all registered
152    /// subscribers (queries). When a source produces a change event, it broadcasts
153    /// it to all dispatchers in this vector.
154    pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
155    /// Runtime context (set by initialize())
156    context: Arc<RwLock<Option<SourceRuntimeContext>>>,
157    /// State store provider (extracted from context for convenience)
158    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
159    /// Handle to the source's main task
160    pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
161    /// Sender for shutdown signal
162    pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
163    /// Optional bootstrap provider - plugins set this if they support bootstrap
164    bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
165    /// Optional identity provider for credential management.
166    /// Set either programmatically (via `set_identity_provider`) or automatically
167    /// from the runtime context during `initialize()`.
168    identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
169    /// Per-query position handles for replay-capable subscribers.
170    ///
171    /// Keyed by `query_id`. Values are the same `Arc<AtomicU64>` returned in
172    /// `SubscriptionResponse::position_handle`. The query writes its last
173    /// durably-processed sequence; the source reads `compute_confirmed_position()`
174    /// to advance its upstream cursor. Initial value is `u64::MAX`
175    /// ("no position confirmed yet"). Only populated when
176    /// `request_position_handle == true`.
177    position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
178    /// Monotonically increasing counter for assigning event sequences.
179    /// The framework stamps every dispatched event with this sequence.
180    next_sequence: Arc<AtomicU64>,
181    /// Original raw config JSON from the descriptor, preserving ConfigValue
182    /// envelopes (secrets, env vars) for lossless persistence roundtrips.
183    raw_config: Option<serde_json::Value>,
184}
185
186impl SourceBase {
187    /// Create a new SourceBase with the given parameters
188    ///
189    /// The status channel is not required during construction - it will be
190    /// provided via the `SourceRuntimeContext` when `initialize()` is called.
191    ///
192    /// If a bootstrap provider is specified in params, it will be set during
193    /// construction (no async needed since nothing is shared yet).
194    pub fn new(params: SourceBaseParams) -> Result<Self> {
195        // Determine dispatch mode (default to Channel if not specified)
196        let dispatch_mode = params.dispatch_mode.unwrap_or_default();
197        let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
198
199        // Set up initial dispatchers based on dispatch mode
200        let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
201            Vec::new();
202
203        if dispatch_mode == DispatchMode::Broadcast {
204            // For broadcast mode, create a single broadcast dispatcher
205            let dispatcher =
206                BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
207            dispatchers.push(Box::new(dispatcher));
208        }
209        // For channel mode, dispatchers will be created on-demand when subscribing
210
211        // Initialize bootstrap provider if provided (no async needed at construction time)
212        let bootstrap_provider = params
213            .bootstrap_provider
214            .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
215
216        Ok(Self {
217            id: params.id.clone(),
218            dispatch_mode,
219            dispatch_buffer_capacity,
220            auto_start: params.auto_start,
221            status_handle: ComponentStatusHandle::new(&params.id),
222            dispatchers: Arc::new(RwLock::new(dispatchers)),
223            context: Arc::new(RwLock::new(None)), // Set by initialize()
224            state_store: Arc::new(RwLock::new(None)), // Extracted from context
225            task_handle: Arc::new(RwLock::new(None)),
226            shutdown_tx: Arc::new(RwLock::new(None)),
227            bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
228            identity_provider: Arc::new(RwLock::new(None)),
229            position_handles: Arc::new(RwLock::new(HashMap::new())),
230            next_sequence: Arc::new(AtomicU64::new(1)),
231            raw_config: None,
232        })
233    }
234
235    /// Get whether this source should auto-start
236    pub fn get_auto_start(&self) -> bool {
237        self.auto_start
238    }
239
240    /// Set the original raw config JSON for lossless persistence roundtrips.
241    ///
242    /// Called by plugin descriptors to preserve the original config JSON
243    /// (including `ConfigValue` envelopes for secrets and env vars) before
244    /// resolution to plain values.
245    pub fn set_raw_config(&mut self, config: serde_json::Value) {
246        self.raw_config = Some(config);
247    }
248
249    /// Get the original raw config JSON, if set by a descriptor.
250    ///
251    /// Returns `None` for builder-created components that don't have
252    /// a raw config JSON (they use DTO reconstruction in `properties()`).
253    pub fn raw_config(&self) -> Option<&serde_json::Value> {
254        self.raw_config.as_ref()
255    }
256
257    /// Build the properties map for this source.
258    ///
259    /// If `raw_config` was set (descriptor path), returns its top-level keys.
260    /// Otherwise, serializes `fallback_dto` (the DTO reconstructed from typed
261    /// config) to produce camelCase output.
262    ///
263    /// This eliminates the duplicated if-let + serialize pattern from plugins.
264    pub fn properties_or_serialize<D: serde::Serialize>(
265        &self,
266        fallback_dto: &D,
267    ) -> HashMap<String, serde_json::Value> {
268        if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
269            return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
270        }
271
272        match serde_json::to_value(fallback_dto) {
273            Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
274            _ => HashMap::new(),
275        }
276    }
277
278    /// Initialize the source with runtime context.
279    ///
280    /// This method is called automatically by DrasiLib's `add_source()` method.
281    /// Plugin developers do not need to call this directly.
282    ///
283    /// The context provides access to:
284    /// - `source_id`: The source's unique identifier
285    /// - `update_tx`: mpsc sender for fire-and-forget status updates to the component graph
286    /// - `state_store`: Optional persistent state storage
287    pub async fn initialize(&self, context: SourceRuntimeContext) {
288        // Store context for later use
289        *self.context.write().await = Some(context.clone());
290
291        // Wire the status handle to the graph update channel
292        self.status_handle.wire(context.update_tx.clone()).await;
293
294        if let Some(state_store) = context.state_store.as_ref() {
295            *self.state_store.write().await = Some(state_store.clone());
296        }
297
298        // Store identity provider from context if not already set programmatically
299        if let Some(ip) = context.identity_provider.as_ref() {
300            let mut guard = self.identity_provider.write().await;
301            if guard.is_none() {
302                *guard = Some(ip.clone());
303            }
304        }
305    }
306
307    /// Get the runtime context if initialized.
308    ///
309    /// Returns `None` if `initialize()` has not been called yet.
310    pub async fn context(&self) -> Option<SourceRuntimeContext> {
311        self.context.read().await.clone()
312    }
313
314    /// Get the state store if configured.
315    ///
316    /// Returns `None` if no state store was provided in the context.
317    pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
318        self.state_store.read().await.clone()
319    }
320
321    /// Get the identity provider if set.
322    ///
323    /// Returns the identity provider set either programmatically via
324    /// `set_identity_provider()` or from the runtime context during `initialize()`.
325    /// Programmatically-set providers take precedence over context providers.
326    pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
327        self.identity_provider.read().await.clone()
328    }
329
330    /// Set the identity provider programmatically.
331    ///
332    /// This is typically called during source construction when the provider
333    /// is available from configuration (e.g., `with_identity_provider()` builder).
334    /// Providers set this way take precedence over context-injected providers.
335    pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
336        *self.identity_provider.write().await = Some(provider);
337    }
338
339    /// Create and register a position handle for `query_id`, initialized to `u64::MAX`.
340    ///
341    /// Returns the shared handle; the same `Arc` is placed in
342    /// `SubscriptionResponse::position_handle` so the query and the source share
343    /// one atomic. If a handle already exists for `query_id` (re-subscribe after
344    /// transient disconnect), the existing handle is returned to preserve any
345    /// position the query had previously reported.
346    pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
347        let mut handles = self.position_handles.write().await;
348        if let Some(existing) = handles.get(query_id) {
349            return existing.clone();
350        }
351        let handle = Arc::new(AtomicU64::new(u64::MAX));
352        handles.insert(query_id.to_string(), handle.clone());
353        handle
354    }
355
356    /// Remove the position handle for `query_id`. No-op if absent.
357    ///
358    /// Called from explicit cleanup paths (`stop_query`/`delete_query` will be
359    /// wired in a follow-up issue). Until then, `cleanup_stale_handles()`
360    /// (invoked inside `compute_confirmed_position`) catches dropped subscribers.
361    pub async fn remove_position_handle(&self, query_id: &str) {
362        let mut handles = self.position_handles.write().await;
363        handles.remove(query_id);
364    }
365
366    /// Compute the minimum confirmed position across all live subscribers.
367    ///
368    /// Returns `None` if no handles are registered, or if every registered
369    /// handle is still `u64::MAX` (no subscriber has confirmed a position yet —
370    /// typically because they are still bootstrapping). Otherwise returns the
371    /// minimum non-`u64::MAX` value, suitable for advancing the source's
372    /// upstream cursor (Postgres `flush_lsn`, Kafka commit, transient WAL prune
373    /// threshold).
374    ///
375    /// Piggy-backs `cleanup_stale_handles()` so dropped subscribers do not pin
376    /// the watermark indefinitely.
377    pub async fn compute_confirmed_position(&self) -> Option<u64> {
378        self.cleanup_stale_handles().await;
379        let handles = self.position_handles.read().await;
380        let mut min: Option<u64> = None;
381        for handle in handles.values() {
382            let v = handle.load(Ordering::Relaxed);
383            if v == u64::MAX {
384                continue;
385            }
386            min = Some(min.map_or(v, |m| m.min(v)));
387        }
388        min
389    }
390
391    /// Drop entries whose `Arc::strong_count == 1` (only `SourceBase` holds a
392    /// reference).
393    ///
394    /// This indicates the subscriber dropped its `SubscriptionResponse` without
395    /// calling `remove_position_handle` — common during query teardown until
396    /// explicit cleanup is wired by the query manager.
397    ///
398    /// Safety constraint: this relies on `SourceBase` being the only long-lived
399    /// holder of the `Arc` besides the subscribing query. If a future periodic
400    /// scan task (or any other component) clones the handle, this method must
401    /// be revisited or replaced with explicit liveness tracking.
402    pub async fn cleanup_stale_handles(&self) {
403        let mut handles = self.position_handles.write().await;
404        handles.retain(|_, handle| Arc::strong_count(handle) > 1);
405    }
406
407    /// Reset the sequence counter, typically after recovering from a checkpoint.
408    /// The next dispatched event will receive `sequence + 1`.
409    pub fn set_next_sequence(&self, sequence: u64) {
410        self.next_sequence
411            .store(sequence.saturating_add(1), Ordering::Relaxed);
412    }
413
414    /// Apply subscription settings that affect the source base.
415    ///
416    /// Should be called at the start of `Source::subscribe()` implementations.
417    /// Handles:
418    /// - Recovering the sequence counter from `last_sequence` to maintain monotonicity
419    ///   across restarts.
420    pub fn apply_subscription_settings(
421        &self,
422        settings: &crate::config::SourceSubscriptionSettings,
423    ) {
424        if let Some(last_seq) = settings.last_sequence {
425            // Atomically advance to last_seq+1, never go backwards.
426            // fetch_max is safe under concurrent subscriptions.
427            let next = last_seq.saturating_add(1);
428            let prev = self.next_sequence.fetch_max(next, Ordering::Relaxed);
429            if next > prev {
430                info!(
431                    "[{}] Sequence counter recovered to {} (from checkpoint last_sequence={})",
432                    self.id, next, last_seq
433                );
434            }
435        }
436    }
437
438    /// Returns a clonable [`ComponentStatusHandle`] for use in spawned tasks.
439    ///
440    /// The handle can both read and write the component's status and automatically
441    /// notifies the graph on every status change (after `initialize()`).
442    pub fn status_handle(&self) -> ComponentStatusHandle {
443        self.status_handle.clone()
444    }
445
446    /// Clone the SourceBase with shared Arc references
447    ///
448    /// This creates a new SourceBase that shares the same underlying
449    /// data through Arc references. Useful for passing to spawned tasks.
450    pub fn clone_shared(&self) -> Self {
451        Self {
452            id: self.id.clone(),
453            dispatch_mode: self.dispatch_mode,
454            dispatch_buffer_capacity: self.dispatch_buffer_capacity,
455            auto_start: self.auto_start,
456            status_handle: self.status_handle.clone(),
457            dispatchers: self.dispatchers.clone(),
458            context: self.context.clone(),
459            state_store: self.state_store.clone(),
460            task_handle: self.task_handle.clone(),
461            shutdown_tx: self.shutdown_tx.clone(),
462            bootstrap_provider: self.bootstrap_provider.clone(),
463            identity_provider: self.identity_provider.clone(),
464            position_handles: self.position_handles.clone(),
465            next_sequence: self.next_sequence.clone(),
466            raw_config: self.raw_config.clone(),
467        }
468    }
469
470    /// Set the bootstrap provider for this source, taking ownership.
471    ///
472    /// Call this after creating the SourceBase if the source plugin supports bootstrapping.
473    /// The bootstrap provider is created by the plugin using its own configuration.
474    ///
475    /// # Example
476    /// ```ignore
477    /// let provider = MyBootstrapProvider::new(config);
478    /// source_base.set_bootstrap_provider(provider).await;  // Ownership transferred
479    /// ```
480    pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
481        *self.bootstrap_provider.write().await = Some(Arc::new(provider));
482    }
483
484    /// Get the source ID
485    pub fn get_id(&self) -> &str {
486        &self.id
487    }
488
489    /// Create a streaming receiver for a query subscription
490    ///
491    /// This creates the appropriate receiver based on the configured dispatch mode:
492    /// - Broadcast mode: Returns a receiver from the shared broadcast dispatcher
493    /// - Channel mode: Creates a new dedicated dispatcher and returns its receiver
494    ///
495    /// This is a helper method that can be used by sources with custom subscribe logic.
496    pub async fn create_streaming_receiver(
497        &self,
498    ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
499        let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
500            DispatchMode::Broadcast => {
501                // For broadcast mode, use the single dispatcher
502                let dispatchers = self.dispatchers.read().await;
503                if let Some(dispatcher) = dispatchers.first() {
504                    dispatcher.create_receiver().await?
505                } else {
506                    return Err(anyhow::anyhow!("No broadcast dispatcher available"));
507                }
508            }
509            DispatchMode::Channel => {
510                // For channel mode, create a new dispatcher for this subscription
511                let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
512                    self.dispatch_buffer_capacity,
513                );
514                let receiver = dispatcher.create_receiver().await?;
515
516                // Add the new dispatcher to our list
517                let mut dispatchers = self.dispatchers.write().await;
518                dispatchers.push(Box::new(dispatcher));
519
520                receiver
521            }
522        };
523
524        Ok(receiver)
525    }
526
527    /// Subscribe to this source with optional bootstrap
528    ///
529    /// This is the standard subscribe implementation that all sources can use.
530    /// It handles:
531    /// - Creating a receiver for streaming events (based on dispatch mode)
532    /// - Setting up bootstrap if requested and a provider has been set
533    /// - Returning the appropriate SubscriptionResponse
534    pub async fn subscribe_with_bootstrap(
535        &self,
536        settings: &crate::config::SourceSubscriptionSettings,
537        source_type: &str,
538    ) -> Result<SubscriptionResponse> {
539        // Recover sequence counter from checkpoint before anything else
540        self.apply_subscription_settings(settings);
541
542        info!(
543            "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
544            settings.query_id,
545            source_type,
546            self.id,
547            settings.enable_bootstrap,
548            settings.resume_from,
549            settings.request_position_handle
550        );
551
552        // Create streaming receiver using helper method
553        let receiver = self.create_streaming_receiver().await?;
554
555        let query_id_for_response = settings.query_id.clone();
556
557        // resume_from overrides bootstrap: a resuming query already has base
558        // state in its persistent index and just needs replay from the
559        // requested sequence. Re-bootstrapping would corrupt that state.
560        let bootstrap_receiver = if settings.resume_from.is_some() {
561            info!(
562                "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
563                settings.query_id, settings.resume_from, source_type, self.id
564            );
565            None
566        } else if settings.enable_bootstrap {
567            self.handle_bootstrap_subscription(settings, source_type)
568                .await?
569        } else {
570            None
571        };
572
573        // Only persistent (replay-capable) queries request a handle. Volatile
574        // queries are deliberately excluded from the min-watermark so they
575        // cannot pin upstream advancement.
576        let position_handle = if settings.request_position_handle {
577            Some(self.create_position_handle(&settings.query_id).await)
578        } else {
579            None
580        };
581
582        Ok(SubscriptionResponse {
583            query_id: query_id_for_response,
584            source_id: self.id.clone(),
585            receiver,
586            bootstrap_receiver,
587            position_handle,
588        })
589    }
590
591    /// Handle bootstrap subscription logic
592    async fn handle_bootstrap_subscription(
593        &self,
594        settings: &crate::config::SourceSubscriptionSettings,
595        source_type: &str,
596    ) -> Result<Option<BootstrapEventReceiver>> {
597        let provider_guard = self.bootstrap_provider.read().await;
598        if let Some(provider) = provider_guard.clone() {
599            drop(provider_guard); // Release lock before spawning task
600
601            info!(
602                "Creating bootstrap for query '{}' on {} source '{}'",
603                settings.query_id, source_type, self.id
604            );
605
606            // Create bootstrap context
607            let context = BootstrapContext::new_minimal(
608                self.id.clone(), // server_id
609                self.id.clone(), // source_id
610            );
611
612            // Create bootstrap channel
613            let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
614
615            // Convert HashSet to Vec for backward compatibility with BootstrapRequest
616            let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
617            let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
618
619            // Create bootstrap request with request_id
620            let request = BootstrapRequest {
621                query_id: settings.query_id.clone(),
622                node_labels,
623                relation_labels,
624                request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
625            };
626
627            // Clone settings for the async task
628            let settings_clone = settings.clone();
629            let source_id = self.id.clone();
630
631            // Get instance_id from context for log routing isolation
632            let instance_id = self
633                .context()
634                .await
635                .map(|c| c.instance_id.clone())
636                .unwrap_or_default();
637
638            // Spawn bootstrap task with tracing span for proper log routing
639            let span = tracing::info_span!(
640                "source_bootstrap",
641                instance_id = %instance_id,
642                component_id = %source_id,
643                component_type = "source"
644            );
645            tokio::spawn(
646                async move {
647                    match provider
648                        .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
649                        .await
650                    {
651                        Ok(result) => {
652                            info!(
653                                "Bootstrap completed successfully for query '{}', sent {} events",
654                                settings_clone.query_id, result.event_count
655                            );
656                            // `result.last_sequence` / `result.sequences_aligned`
657                            // are intentionally unused at this call site — a
658                            // future query-processor integration issue will
659                            // plumb them through to the handover protocol.
660                        }
661                        Err(e) => {
662                            error!(
663                                "Bootstrap failed for query '{}': {e}",
664                                settings_clone.query_id
665                            );
666                        }
667                    }
668                }
669                .instrument(span),
670            );
671
672            Ok(Some(bootstrap_rx))
673        } else {
674            info!(
675                "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
676                settings.query_id, source_type, self.id
677            );
678            Ok(None)
679        }
680    }
681
682    /// Dispatch a SourceChange event with profiling metadata
683    ///
684    /// This method handles the common pattern of:
685    /// - Creating profiling metadata with timestamp
686    /// - Wrapping the change in a SourceEventWrapper
687    /// - Dispatching to all subscribers
688    /// - Handling the no-subscriber case gracefully
689    pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
690        // Create profiling metadata
691        let mut profiling = profiling::ProfilingMetadata::new();
692        profiling.source_send_ns = Some(profiling::timestamp_ns());
693
694        // Create event wrapper
695        let wrapper = SourceEventWrapper::with_profiling(
696            self.id.clone(),
697            SourceEvent::Change(change),
698            chrono::Utc::now(),
699            profiling,
700        );
701
702        // Dispatch event
703        self.dispatch_event(wrapper).await
704    }
705
706    /// Maximum allowed size for source position bytes (64KB).
707    /// Positions exceeding this limit are skipped at checkpoint time
708    /// to prevent memory issues, preserving the last good position.
709    pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536;
710
711    /// Dispatch a SourceEventWrapper to all subscribers.
712    ///
713    /// This is a generic method for dispatching any SourceEvent.
714    /// It handles Arc-wrapping for zero-copy sharing and logs
715    /// when there are no subscribers.
716    /// The framework stamps every event with a monotonic sequence number.
717    pub async fn dispatch_event(&self, mut wrapper: SourceEventWrapper) -> Result<()> {
718        // Warn about oversized source positions; the checkpoint layer will
719        // enforce the limit and preserve the last good position.
720        if let Some(ref pos) = wrapper.source_position {
721            if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
722                warn!(
723                    "[{}] Source position is large ({} bytes > {} limit); \
724                     checkpoint staging will preserve the previous good position",
725                    self.id,
726                    pos.len(),
727                    Self::MAX_SOURCE_POSITION_BYTES
728                );
729            }
730        }
731
732        // Framework assigns the monotonic sequence
733        wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
734
735        debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
736
737        // Arc-wrap for zero-copy sharing across dispatchers
738        let arc_wrapper = Arc::new(wrapper);
739
740        // Send to all dispatchers
741        let dispatchers = self.dispatchers.read().await;
742        for dispatcher in dispatchers.iter() {
743            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
744                debug!("[{}] Failed to dispatch event: {}", self.id, e);
745            }
746        }
747
748        Ok(())
749    }
750
751    /// Broadcast SourceControl events
752    pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
753        let wrapper = SourceEventWrapper::new(
754            self.id.clone(),
755            SourceEvent::Control(control),
756            chrono::Utc::now(),
757        );
758        self.dispatch_event(wrapper).await
759    }
760
761    /// Create a test subscription to this source (synchronous, fallible)
762    ///
763    /// This method is intended for use in tests to receive events from the source.
764    /// It properly handles both Broadcast and Channel dispatch modes by delegating
765    /// to `create_streaming_receiver()`, making the dispatch mode transparent to tests.
766    ///
767    /// Note: This is a synchronous wrapper that uses `tokio::task::block_in_place` internally.
768    /// For async contexts, prefer calling `create_streaming_receiver()` directly.
769    ///
770    /// # Returns
771    /// A receiver that will receive all events dispatched by this source,
772    /// or an error if the receiver cannot be created.
773    pub fn try_test_subscribe(
774        &self,
775    ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
776        tokio::task::block_in_place(|| {
777            tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
778        })
779    }
780
781    /// Create a test subscription to this source (synchronous wrapper)
782    ///
783    /// Convenience wrapper around [`try_test_subscribe`](Self::try_test_subscribe)
784    /// that panics on failure. Prefer `try_test_subscribe()` in new code.
785    ///
786    /// # Panics
787    /// Panics if the receiver cannot be created.
788    pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
789        self.try_test_subscribe()
790            .expect("Failed to create test subscription receiver")
791    }
792
793    /// Helper function to dispatch events from spawned tasks
794    ///
795    /// This is a static helper that can be used from spawned async tasks that don't
796    /// have access to `self`. It manually iterates through dispatchers and sends the event.
797    ///
798    /// For code that has access to `&self`, prefer using `dispatch_event()` instead.
799    ///
800    /// # Arguments
801    /// * `dispatchers` - Arc to the dispatchers list (from `self.base.dispatchers.clone()`)
802    /// * `wrapper` - The event wrapper to dispatch
803    /// * `source_id` - Source ID for logging
804    pub async fn dispatch_from_task(
805        dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
806        wrapper: SourceEventWrapper,
807        source_id: &str,
808    ) -> Result<()> {
809        debug!(
810            "[{}] Dispatching event from task: {:?}",
811            source_id, &wrapper
812        );
813
814        // Arc-wrap for zero-copy sharing across dispatchers
815        let arc_wrapper = Arc::new(wrapper);
816
817        // Send to all dispatchers
818        let dispatchers_guard = dispatchers.read().await;
819        for dispatcher in dispatchers_guard.iter() {
820            if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
821                debug!("[{source_id}] Failed to dispatch event from task: {e}");
822            }
823        }
824
825        Ok(())
826    }
827
828    /// Handle common stop functionality
829    pub async fn stop_common(&self) -> Result<()> {
830        info!("Stopping source '{}'", self.id);
831
832        // Send shutdown signal if we have one
833        if let Some(tx) = self.shutdown_tx.write().await.take() {
834            let _ = tx.send(());
835        }
836
837        // Wait for task to complete
838        if let Some(mut handle) = self.task_handle.write().await.take() {
839            match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
840                Ok(Ok(())) => {
841                    info!("Source '{}' task completed successfully", self.id);
842                }
843                Ok(Err(e)) => {
844                    error!("Source '{}' task panicked: {}", self.id, e);
845                }
846                Err(_) => {
847                    warn!(
848                        "Source '{}' task did not complete within timeout, aborting",
849                        self.id
850                    );
851                    handle.abort();
852                }
853            }
854        }
855
856        self.set_status(
857            ComponentStatus::Stopped,
858            Some(format!("Source '{}' stopped", self.id)),
859        )
860        .await;
861        info!("Source '{}' stopped", self.id);
862        Ok(())
863    }
864
865    /// Clear the source's state store partition.
866    ///
867    /// This is called during deprovision to remove all persisted state
868    /// associated with this source. Sources that override `deprovision()`
869    /// can call this to clean up their state store.
870    pub async fn deprovision_common(&self) -> Result<()> {
871        info!("Deprovisioning source '{}'", self.id);
872        if let Some(store) = self.state_store().await {
873            let count = store.clear_store(&self.id).await.map_err(|e| {
874                anyhow::anyhow!(
875                    "Failed to clear state store for source '{}': {}",
876                    self.id,
877                    e
878                )
879            })?;
880            info!(
881                "Cleared {} keys from state store for source '{}'",
882                count, self.id
883            );
884        }
885        Ok(())
886    }
887
888    /// Get the current status.
889    pub async fn get_status(&self) -> ComponentStatus {
890        self.status_handle.get_status().await
891    }
892
893    /// Set the component's status — updates local state AND notifies the graph.
894    ///
895    /// This is the single canonical way to change a source's status.
896    pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
897        self.status_handle.set_status(status, message).await;
898    }
899
900    /// Set the task handle
901    pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
902        *self.task_handle.write().await = Some(handle);
903    }
904
905    /// Set the shutdown sender
906    pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
907        *self.shutdown_tx.write().await = Some(tx);
908    }
909}
910
911#[cfg(test)]
912mod tests {
913    use super::*;
914
915    // =========================================================================
916    // SourceBaseParams tests
917    // =========================================================================
918
919    #[test]
920    fn test_params_new_defaults() {
921        let params = SourceBaseParams::new("test-source");
922        assert_eq!(params.id, "test-source");
923        assert!(params.dispatch_mode.is_none());
924        assert!(params.dispatch_buffer_capacity.is_none());
925        assert!(params.bootstrap_provider.is_none());
926        assert!(params.auto_start);
927    }
928
929    #[test]
930    fn test_params_with_dispatch_mode() {
931        let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
932        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
933    }
934
935    #[test]
936    fn test_params_with_dispatch_buffer_capacity() {
937        let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
938        assert_eq!(params.dispatch_buffer_capacity, Some(50000));
939    }
940
941    #[test]
942    fn test_params_with_auto_start_false() {
943        let params = SourceBaseParams::new("s1").with_auto_start(false);
944        assert!(!params.auto_start);
945    }
946
947    #[test]
948    fn test_params_builder_chaining() {
949        let params = SourceBaseParams::new("chained")
950            .with_dispatch_mode(DispatchMode::Broadcast)
951            .with_dispatch_buffer_capacity(2000)
952            .with_auto_start(false);
953
954        assert_eq!(params.id, "chained");
955        assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
956        assert_eq!(params.dispatch_buffer_capacity, Some(2000));
957        assert!(!params.auto_start);
958    }
959
960    // =========================================================================
961    // SourceBase tests
962    // =========================================================================
963
964    #[tokio::test]
965    async fn test_new_defaults() {
966        let params = SourceBaseParams::new("my-source");
967        let base = SourceBase::new(params).unwrap();
968
969        assert_eq!(base.id, "my-source");
970        assert!(base.auto_start);
971        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
972    }
973
974    #[tokio::test]
975    async fn test_get_id() {
976        let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
977        assert_eq!(base.get_id(), "id-check");
978    }
979
980    #[tokio::test]
981    async fn test_get_auto_start() {
982        let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
983        assert!(base_default.get_auto_start());
984
985        let base_false =
986            SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
987        assert!(!base_false.get_auto_start());
988    }
989
990    #[tokio::test]
991    async fn test_get_status_initial() {
992        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
993        assert_eq!(base.get_status().await, ComponentStatus::Stopped);
994    }
995
996    #[tokio::test]
997    async fn test_set_status() {
998        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
999
1000        base.set_status(ComponentStatus::Running, None).await;
1001        assert_eq!(base.get_status().await, ComponentStatus::Running);
1002
1003        base.set_status(ComponentStatus::Error, Some("oops".into()))
1004            .await;
1005        assert_eq!(base.get_status().await, ComponentStatus::Error);
1006    }
1007
1008    #[tokio::test]
1009    async fn test_status_handle_returns_handle() {
1010        let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1011        let handle = base.status_handle();
1012
1013        // The handle should reflect the same status as the base
1014        assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
1015
1016        // Mutating through the handle should be visible via SourceBase
1017        handle.set_status(ComponentStatus::Starting, None).await;
1018        assert_eq!(base.get_status().await, ComponentStatus::Starting);
1019    }
1020
1021    // =========================================================================
1022    // Position handle and resume_from tests (issue #366)
1023    // =========================================================================
1024
1025    use crate::bootstrap::{
1026        BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
1027    };
1028    use crate::channels::BootstrapEventSender;
1029    use async_trait::async_trait;
1030
1031    fn make_settings(
1032        query_id: &str,
1033        enable_bootstrap: bool,
1034        resume_from: Option<bytes::Bytes>,
1035        request_position_handle: bool,
1036    ) -> crate::config::SourceSubscriptionSettings {
1037        use std::collections::HashSet;
1038        crate::config::SourceSubscriptionSettings {
1039            source_id: "test-src".to_string(),
1040            enable_bootstrap,
1041            query_id: query_id.to_string(),
1042            nodes: HashSet::new(),
1043            relations: HashSet::new(),
1044            resume_from,
1045            request_position_handle,
1046            last_sequence: None,
1047        }
1048    }
1049
1050    /// Minimal bootstrap provider that closes its sender immediately.
1051    /// Enough to verify a `bootstrap_receiver` was created without sending data.
1052    struct NoopProvider;
1053
1054    #[async_trait]
1055    impl BootstrapProvider for NoopProvider {
1056        async fn bootstrap(
1057            &self,
1058            _request: BootstrapRequest,
1059            _context: &BootstrapContext,
1060            _event_tx: BootstrapEventSender,
1061            _settings: Option<&crate::config::SourceSubscriptionSettings>,
1062        ) -> Result<BootstrapResult> {
1063            Ok(BootstrapResult::default())
1064        }
1065    }
1066
1067    fn make_base_with_bootstrap(id: &str) -> SourceBase {
1068        let mut params = SourceBaseParams::new(id);
1069        params.bootstrap_provider = Some(Box::new(NoopProvider));
1070        SourceBase::new(params).unwrap()
1071    }
1072
1073    #[tokio::test]
1074    async fn test_create_position_handle_initializes_to_u64_max() {
1075        let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
1076        let handle = base.create_position_handle("q1").await;
1077        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1078    }
1079
1080    #[tokio::test]
1081    async fn test_create_position_handle_idempotent_for_same_query() {
1082        let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
1083        let h1 = base.create_position_handle("q1").await;
1084        h1.store(123, Ordering::Relaxed);
1085        let h2 = base.create_position_handle("q1").await;
1086        // Same Arc — preserves the previously reported position.
1087        assert!(Arc::ptr_eq(&h1, &h2));
1088        assert_eq!(h2.load(Ordering::Relaxed), 123);
1089    }
1090
1091    #[tokio::test]
1092    async fn test_remove_position_handle_drops_entry() {
1093        let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
1094        let handle = base.create_position_handle("q1").await;
1095        handle.store(42, Ordering::Relaxed);
1096        assert_eq!(base.compute_confirmed_position().await, Some(42));
1097        base.remove_position_handle("q1").await;
1098        assert_eq!(base.compute_confirmed_position().await, None);
1099    }
1100
1101    #[tokio::test]
1102    async fn test_remove_position_handle_noop_when_absent() {
1103        let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
1104        // Must not panic.
1105        base.remove_position_handle("never-registered").await;
1106        assert_eq!(base.compute_confirmed_position().await, None);
1107    }
1108
1109    #[tokio::test]
1110    async fn test_compute_confirmed_position_returns_none_when_empty() {
1111        let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1112        assert_eq!(base.compute_confirmed_position().await, None);
1113    }
1114
1115    #[tokio::test]
1116    async fn test_compute_confirmed_position_returns_none_when_all_max() {
1117        let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1118        let _h1 = base.create_position_handle("q1").await;
1119        let _h2 = base.create_position_handle("q2").await;
1120        assert_eq!(base.compute_confirmed_position().await, None);
1121    }
1122
1123    #[tokio::test]
1124    async fn test_compute_confirmed_position_filters_max_returns_min() {
1125        let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1126        let h1 = base.create_position_handle("q1").await;
1127        let _h2 = base.create_position_handle("q2").await; // stays u64::MAX
1128        let h3 = base.create_position_handle("q3").await;
1129        h1.store(100, Ordering::Relaxed);
1130        h3.store(50, Ordering::Relaxed);
1131        assert_eq!(base.compute_confirmed_position().await, Some(50));
1132    }
1133
1134    #[tokio::test]
1135    async fn test_compute_confirmed_position_single_real_value() {
1136        let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1137        let h1 = base.create_position_handle("q1").await;
1138        let _h2 = base.create_position_handle("q2").await;
1139        h1.store(7, Ordering::Relaxed);
1140        assert_eq!(base.compute_confirmed_position().await, Some(7));
1141    }
1142
1143    #[tokio::test]
1144    async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1145        let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1146        {
1147            let handle = base.create_position_handle("q1").await;
1148            handle.store(99, Ordering::Relaxed);
1149            // handle (the external clone) drops here.
1150        }
1151        base.cleanup_stale_handles().await;
1152        assert_eq!(base.compute_confirmed_position().await, None);
1153    }
1154
1155    #[tokio::test]
1156    async fn test_cleanup_stale_handles_keeps_held_arc() {
1157        let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1158        let handle = base.create_position_handle("q1").await;
1159        handle.store(11, Ordering::Relaxed);
1160        base.cleanup_stale_handles().await;
1161        // External clone still alive, entry retained.
1162        assert_eq!(base.compute_confirmed_position().await, Some(11));
1163        // Keep `handle` alive past the assertion.
1164        drop(handle);
1165    }
1166
1167    #[tokio::test]
1168    async fn test_subscribe_with_request_position_handle_returns_handle() {
1169        let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1170        let response = base
1171            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1172            .await
1173            .unwrap();
1174        let handle = response.position_handle.expect("expected handle");
1175        assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1176        // Internal map should also have the entry (so min-watermark sees it).
1177        assert_eq!(base.compute_confirmed_position().await, None); // u64::MAX → None
1178    }
1179
1180    #[tokio::test]
1181    async fn test_subscribe_without_request_position_handle_returns_none() {
1182        let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1183        let response = base
1184            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1185            .await
1186            .unwrap();
1187        assert!(response.position_handle.is_none());
1188        // Internal map must be empty so this volatile query is excluded from
1189        // the min-watermark.
1190        let handles = base.position_handles.read().await;
1191        assert!(handles.is_empty());
1192    }
1193
1194    #[tokio::test]
1195    async fn test_subscribe_returned_handle_shared_with_internal() {
1196        let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1197        let response = base
1198            .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1199            .await
1200            .unwrap();
1201        let handle = response.position_handle.unwrap();
1202        handle.store(42, Ordering::Relaxed);
1203        assert_eq!(base.compute_confirmed_position().await, Some(42));
1204    }
1205
1206    #[tokio::test]
1207    async fn test_subscribe_with_resume_from_skips_bootstrap() {
1208        let base = make_base_with_bootstrap("sub-resume");
1209        let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1210        let response = base
1211            .subscribe_with_bootstrap(&make_settings("q1", true, Some(position), false), "test")
1212            .await
1213            .unwrap();
1214        assert!(
1215            response.bootstrap_receiver.is_none(),
1216            "resume_from must override enable_bootstrap"
1217        );
1218    }
1219
1220    #[tokio::test]
1221    async fn test_subscribe_resume_without_bootstrap_still_none() {
1222        let base = make_base_with_bootstrap("sub-resume-no-bs");
1223        let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1224        let response = base
1225            .subscribe_with_bootstrap(&make_settings("q1", false, Some(position), false), "test")
1226            .await
1227            .unwrap();
1228        assert!(response.bootstrap_receiver.is_none());
1229    }
1230
1231    #[tokio::test]
1232    async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1233        let base = make_base_with_bootstrap("sub-bs");
1234        let response = base
1235            .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1236            .await
1237            .unwrap();
1238        assert!(
1239            response.bootstrap_receiver.is_some(),
1240            "regression guard: bootstrap path must still produce a receiver"
1241        );
1242    }
1243
1244    #[tokio::test]
1245    async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1246        let base = make_base_with_bootstrap("sub-neither");
1247        let response = base
1248            .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1249            .await
1250            .unwrap();
1251        assert!(response.bootstrap_receiver.is_none());
1252        assert!(response.position_handle.is_none());
1253    }
1254}