1use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::RwLock;
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::state_store::StateStoreProvider;
47use drasi_core::models::SourceChange;
48
49pub struct SourceBaseParams {
67 pub id: String,
69 pub dispatch_mode: Option<DispatchMode>,
71 pub dispatch_buffer_capacity: Option<usize>,
73 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
75 pub auto_start: bool,
77}
78
79impl std::fmt::Debug for SourceBaseParams {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("SourceBaseParams")
82 .field("id", &self.id)
83 .field("dispatch_mode", &self.dispatch_mode)
84 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
85 .field(
86 "bootstrap_provider",
87 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
88 )
89 .field("auto_start", &self.auto_start)
90 .finish()
91 }
92}
93
94impl SourceBaseParams {
95 pub fn new(id: impl Into<String>) -> Self {
97 Self {
98 id: id.into(),
99 dispatch_mode: None,
100 dispatch_buffer_capacity: None,
101 bootstrap_provider: None,
102 auto_start: true,
103 }
104 }
105
106 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
108 self.dispatch_mode = Some(mode);
109 self
110 }
111
112 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
114 self.dispatch_buffer_capacity = Some(capacity);
115 self
116 }
117
118 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
123 self.bootstrap_provider = Some(Box::new(provider));
124 self
125 }
126
127 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
132 self.auto_start = auto_start;
133 self
134 }
135}
136
137pub struct SourceBase {
139 pub id: String,
141 dispatch_mode: DispatchMode,
143 dispatch_buffer_capacity: usize,
145 pub auto_start: bool,
147 status_handle: ComponentStatusHandle,
149 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
155 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
157 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
159 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
161 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
163 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
165 identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
169 position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
178 next_sequence: Arc<AtomicU64>,
181 raw_config: Option<serde_json::Value>,
184}
185
186impl SourceBase {
187 pub fn new(params: SourceBaseParams) -> Result<Self> {
195 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
197 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
198
199 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
201 Vec::new();
202
203 if dispatch_mode == DispatchMode::Broadcast {
204 let dispatcher =
206 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
207 dispatchers.push(Box::new(dispatcher));
208 }
209 let bootstrap_provider = params
213 .bootstrap_provider
214 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
215
216 Ok(Self {
217 id: params.id.clone(),
218 dispatch_mode,
219 dispatch_buffer_capacity,
220 auto_start: params.auto_start,
221 status_handle: ComponentStatusHandle::new(¶ms.id),
222 dispatchers: Arc::new(RwLock::new(dispatchers)),
223 context: Arc::new(RwLock::new(None)), state_store: Arc::new(RwLock::new(None)), task_handle: Arc::new(RwLock::new(None)),
226 shutdown_tx: Arc::new(RwLock::new(None)),
227 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
228 identity_provider: Arc::new(RwLock::new(None)),
229 position_handles: Arc::new(RwLock::new(HashMap::new())),
230 next_sequence: Arc::new(AtomicU64::new(1)),
231 raw_config: None,
232 })
233 }
234
235 pub fn get_auto_start(&self) -> bool {
237 self.auto_start
238 }
239
240 pub fn set_raw_config(&mut self, config: serde_json::Value) {
246 self.raw_config = Some(config);
247 }
248
249 pub fn raw_config(&self) -> Option<&serde_json::Value> {
254 self.raw_config.as_ref()
255 }
256
257 pub fn properties_or_serialize<D: serde::Serialize>(
265 &self,
266 fallback_dto: &D,
267 ) -> HashMap<String, serde_json::Value> {
268 if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
269 return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
270 }
271
272 match serde_json::to_value(fallback_dto) {
273 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
274 _ => HashMap::new(),
275 }
276 }
277
278 pub async fn initialize(&self, context: SourceRuntimeContext) {
288 *self.context.write().await = Some(context.clone());
290
291 self.status_handle.wire(context.update_tx.clone()).await;
293
294 if let Some(state_store) = context.state_store.as_ref() {
295 *self.state_store.write().await = Some(state_store.clone());
296 }
297
298 if let Some(ip) = context.identity_provider.as_ref() {
300 let mut guard = self.identity_provider.write().await;
301 if guard.is_none() {
302 *guard = Some(ip.clone());
303 }
304 }
305 }
306
307 pub async fn context(&self) -> Option<SourceRuntimeContext> {
311 self.context.read().await.clone()
312 }
313
314 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
318 self.state_store.read().await.clone()
319 }
320
321 pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
327 self.identity_provider.read().await.clone()
328 }
329
330 pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
336 *self.identity_provider.write().await = Some(provider);
337 }
338
339 pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
347 let mut handles = self.position_handles.write().await;
348 if let Some(existing) = handles.get(query_id) {
349 return existing.clone();
350 }
351 let handle = Arc::new(AtomicU64::new(u64::MAX));
352 handles.insert(query_id.to_string(), handle.clone());
353 handle
354 }
355
356 pub async fn remove_position_handle(&self, query_id: &str) {
362 let mut handles = self.position_handles.write().await;
363 handles.remove(query_id);
364 }
365
366 pub async fn compute_confirmed_position(&self) -> Option<u64> {
378 self.cleanup_stale_handles().await;
379 let handles = self.position_handles.read().await;
380 let mut min: Option<u64> = None;
381 for handle in handles.values() {
382 let v = handle.load(Ordering::Relaxed);
383 if v == u64::MAX {
384 continue;
385 }
386 min = Some(min.map_or(v, |m| m.min(v)));
387 }
388 min
389 }
390
391 pub async fn cleanup_stale_handles(&self) {
403 let mut handles = self.position_handles.write().await;
404 handles.retain(|_, handle| Arc::strong_count(handle) > 1);
405 }
406
407 pub fn set_next_sequence(&self, sequence: u64) {
410 self.next_sequence
411 .store(sequence.saturating_add(1), Ordering::Relaxed);
412 }
413
414 pub fn apply_subscription_settings(
421 &self,
422 settings: &crate::config::SourceSubscriptionSettings,
423 ) {
424 if let Some(last_seq) = settings.last_sequence {
425 let next = last_seq.saturating_add(1);
428 let prev = self.next_sequence.fetch_max(next, Ordering::Relaxed);
429 if next > prev {
430 info!(
431 "[{}] Sequence counter recovered to {} (from checkpoint last_sequence={})",
432 self.id, next, last_seq
433 );
434 }
435 }
436 }
437
438 pub fn status_handle(&self) -> ComponentStatusHandle {
443 self.status_handle.clone()
444 }
445
446 pub fn clone_shared(&self) -> Self {
451 Self {
452 id: self.id.clone(),
453 dispatch_mode: self.dispatch_mode,
454 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
455 auto_start: self.auto_start,
456 status_handle: self.status_handle.clone(),
457 dispatchers: self.dispatchers.clone(),
458 context: self.context.clone(),
459 state_store: self.state_store.clone(),
460 task_handle: self.task_handle.clone(),
461 shutdown_tx: self.shutdown_tx.clone(),
462 bootstrap_provider: self.bootstrap_provider.clone(),
463 identity_provider: self.identity_provider.clone(),
464 position_handles: self.position_handles.clone(),
465 next_sequence: self.next_sequence.clone(),
466 raw_config: self.raw_config.clone(),
467 }
468 }
469
470 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
481 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
482 }
483
484 pub fn get_id(&self) -> &str {
486 &self.id
487 }
488
489 pub async fn create_streaming_receiver(
497 &self,
498 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
499 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
500 DispatchMode::Broadcast => {
501 let dispatchers = self.dispatchers.read().await;
503 if let Some(dispatcher) = dispatchers.first() {
504 dispatcher.create_receiver().await?
505 } else {
506 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
507 }
508 }
509 DispatchMode::Channel => {
510 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
512 self.dispatch_buffer_capacity,
513 );
514 let receiver = dispatcher.create_receiver().await?;
515
516 let mut dispatchers = self.dispatchers.write().await;
518 dispatchers.push(Box::new(dispatcher));
519
520 receiver
521 }
522 };
523
524 Ok(receiver)
525 }
526
527 pub async fn subscribe_with_bootstrap(
535 &self,
536 settings: &crate::config::SourceSubscriptionSettings,
537 source_type: &str,
538 ) -> Result<SubscriptionResponse> {
539 self.apply_subscription_settings(settings);
541
542 info!(
543 "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
544 settings.query_id,
545 source_type,
546 self.id,
547 settings.enable_bootstrap,
548 settings.resume_from,
549 settings.request_position_handle
550 );
551
552 let receiver = self.create_streaming_receiver().await?;
554
555 let query_id_for_response = settings.query_id.clone();
556
557 let bootstrap_receiver = if settings.resume_from.is_some() {
561 info!(
562 "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
563 settings.query_id, settings.resume_from, source_type, self.id
564 );
565 None
566 } else if settings.enable_bootstrap {
567 self.handle_bootstrap_subscription(settings, source_type)
568 .await?
569 } else {
570 None
571 };
572
573 let position_handle = if settings.request_position_handle {
577 Some(self.create_position_handle(&settings.query_id).await)
578 } else {
579 None
580 };
581
582 Ok(SubscriptionResponse {
583 query_id: query_id_for_response,
584 source_id: self.id.clone(),
585 receiver,
586 bootstrap_receiver,
587 position_handle,
588 })
589 }
590
591 async fn handle_bootstrap_subscription(
593 &self,
594 settings: &crate::config::SourceSubscriptionSettings,
595 source_type: &str,
596 ) -> Result<Option<BootstrapEventReceiver>> {
597 let provider_guard = self.bootstrap_provider.read().await;
598 if let Some(provider) = provider_guard.clone() {
599 drop(provider_guard); info!(
602 "Creating bootstrap for query '{}' on {} source '{}'",
603 settings.query_id, source_type, self.id
604 );
605
606 let context = BootstrapContext::new_minimal(
608 self.id.clone(), self.id.clone(), );
611
612 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
614
615 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
617 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
618
619 let request = BootstrapRequest {
621 query_id: settings.query_id.clone(),
622 node_labels,
623 relation_labels,
624 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
625 };
626
627 let settings_clone = settings.clone();
629 let source_id = self.id.clone();
630
631 let instance_id = self
633 .context()
634 .await
635 .map(|c| c.instance_id.clone())
636 .unwrap_or_default();
637
638 let span = tracing::info_span!(
640 "source_bootstrap",
641 instance_id = %instance_id,
642 component_id = %source_id,
643 component_type = "source"
644 );
645 tokio::spawn(
646 async move {
647 match provider
648 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
649 .await
650 {
651 Ok(result) => {
652 info!(
653 "Bootstrap completed successfully for query '{}', sent {} events",
654 settings_clone.query_id, result.event_count
655 );
656 }
661 Err(e) => {
662 error!(
663 "Bootstrap failed for query '{}': {e}",
664 settings_clone.query_id
665 );
666 }
667 }
668 }
669 .instrument(span),
670 );
671
672 Ok(Some(bootstrap_rx))
673 } else {
674 info!(
675 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
676 settings.query_id, source_type, self.id
677 );
678 Ok(None)
679 }
680 }
681
682 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
690 let mut profiling = profiling::ProfilingMetadata::new();
692 profiling.source_send_ns = Some(profiling::timestamp_ns());
693
694 let wrapper = SourceEventWrapper::with_profiling(
696 self.id.clone(),
697 SourceEvent::Change(change),
698 chrono::Utc::now(),
699 profiling,
700 );
701
702 self.dispatch_event(wrapper).await
704 }
705
706 pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536;
710
711 pub async fn dispatch_event(&self, mut wrapper: SourceEventWrapper) -> Result<()> {
718 if let Some(ref pos) = wrapper.source_position {
721 if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
722 warn!(
723 "[{}] Source position is large ({} bytes > {} limit); \
724 checkpoint staging will preserve the previous good position",
725 self.id,
726 pos.len(),
727 Self::MAX_SOURCE_POSITION_BYTES
728 );
729 }
730 }
731
732 wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
734
735 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
736
737 let arc_wrapper = Arc::new(wrapper);
739
740 let dispatchers = self.dispatchers.read().await;
742 for dispatcher in dispatchers.iter() {
743 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
744 debug!("[{}] Failed to dispatch event: {}", self.id, e);
745 }
746 }
747
748 Ok(())
749 }
750
751 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
753 let wrapper = SourceEventWrapper::new(
754 self.id.clone(),
755 SourceEvent::Control(control),
756 chrono::Utc::now(),
757 );
758 self.dispatch_event(wrapper).await
759 }
760
761 pub fn try_test_subscribe(
774 &self,
775 ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
776 tokio::task::block_in_place(|| {
777 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
778 })
779 }
780
781 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
789 self.try_test_subscribe()
790 .expect("Failed to create test subscription receiver")
791 }
792
793 pub async fn dispatch_from_task(
805 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
806 wrapper: SourceEventWrapper,
807 source_id: &str,
808 ) -> Result<()> {
809 debug!(
810 "[{}] Dispatching event from task: {:?}",
811 source_id, &wrapper
812 );
813
814 let arc_wrapper = Arc::new(wrapper);
816
817 let dispatchers_guard = dispatchers.read().await;
819 for dispatcher in dispatchers_guard.iter() {
820 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
821 debug!("[{source_id}] Failed to dispatch event from task: {e}");
822 }
823 }
824
825 Ok(())
826 }
827
828 pub async fn stop_common(&self) -> Result<()> {
830 info!("Stopping source '{}'", self.id);
831
832 if let Some(tx) = self.shutdown_tx.write().await.take() {
834 let _ = tx.send(());
835 }
836
837 if let Some(mut handle) = self.task_handle.write().await.take() {
839 match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
840 Ok(Ok(())) => {
841 info!("Source '{}' task completed successfully", self.id);
842 }
843 Ok(Err(e)) => {
844 error!("Source '{}' task panicked: {}", self.id, e);
845 }
846 Err(_) => {
847 warn!(
848 "Source '{}' task did not complete within timeout, aborting",
849 self.id
850 );
851 handle.abort();
852 }
853 }
854 }
855
856 self.set_status(
857 ComponentStatus::Stopped,
858 Some(format!("Source '{}' stopped", self.id)),
859 )
860 .await;
861 info!("Source '{}' stopped", self.id);
862 Ok(())
863 }
864
865 pub async fn deprovision_common(&self) -> Result<()> {
871 info!("Deprovisioning source '{}'", self.id);
872 if let Some(store) = self.state_store().await {
873 let count = store.clear_store(&self.id).await.map_err(|e| {
874 anyhow::anyhow!(
875 "Failed to clear state store for source '{}': {}",
876 self.id,
877 e
878 )
879 })?;
880 info!(
881 "Cleared {} keys from state store for source '{}'",
882 count, self.id
883 );
884 }
885 Ok(())
886 }
887
888 pub async fn get_status(&self) -> ComponentStatus {
890 self.status_handle.get_status().await
891 }
892
893 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
897 self.status_handle.set_status(status, message).await;
898 }
899
900 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
902 *self.task_handle.write().await = Some(handle);
903 }
904
905 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
907 *self.shutdown_tx.write().await = Some(tx);
908 }
909}
910
911#[cfg(test)]
912mod tests {
913 use super::*;
914
915 #[test]
920 fn test_params_new_defaults() {
921 let params = SourceBaseParams::new("test-source");
922 assert_eq!(params.id, "test-source");
923 assert!(params.dispatch_mode.is_none());
924 assert!(params.dispatch_buffer_capacity.is_none());
925 assert!(params.bootstrap_provider.is_none());
926 assert!(params.auto_start);
927 }
928
929 #[test]
930 fn test_params_with_dispatch_mode() {
931 let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
932 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
933 }
934
935 #[test]
936 fn test_params_with_dispatch_buffer_capacity() {
937 let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
938 assert_eq!(params.dispatch_buffer_capacity, Some(50000));
939 }
940
941 #[test]
942 fn test_params_with_auto_start_false() {
943 let params = SourceBaseParams::new("s1").with_auto_start(false);
944 assert!(!params.auto_start);
945 }
946
947 #[test]
948 fn test_params_builder_chaining() {
949 let params = SourceBaseParams::new("chained")
950 .with_dispatch_mode(DispatchMode::Broadcast)
951 .with_dispatch_buffer_capacity(2000)
952 .with_auto_start(false);
953
954 assert_eq!(params.id, "chained");
955 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
956 assert_eq!(params.dispatch_buffer_capacity, Some(2000));
957 assert!(!params.auto_start);
958 }
959
960 #[tokio::test]
965 async fn test_new_defaults() {
966 let params = SourceBaseParams::new("my-source");
967 let base = SourceBase::new(params).unwrap();
968
969 assert_eq!(base.id, "my-source");
970 assert!(base.auto_start);
971 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
972 }
973
974 #[tokio::test]
975 async fn test_get_id() {
976 let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
977 assert_eq!(base.get_id(), "id-check");
978 }
979
980 #[tokio::test]
981 async fn test_get_auto_start() {
982 let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
983 assert!(base_default.get_auto_start());
984
985 let base_false =
986 SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
987 assert!(!base_false.get_auto_start());
988 }
989
990 #[tokio::test]
991 async fn test_get_status_initial() {
992 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
993 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
994 }
995
996 #[tokio::test]
997 async fn test_set_status() {
998 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
999
1000 base.set_status(ComponentStatus::Running, None).await;
1001 assert_eq!(base.get_status().await, ComponentStatus::Running);
1002
1003 base.set_status(ComponentStatus::Error, Some("oops".into()))
1004 .await;
1005 assert_eq!(base.get_status().await, ComponentStatus::Error);
1006 }
1007
1008 #[tokio::test]
1009 async fn test_status_handle_returns_handle() {
1010 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1011 let handle = base.status_handle();
1012
1013 assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
1015
1016 handle.set_status(ComponentStatus::Starting, None).await;
1018 assert_eq!(base.get_status().await, ComponentStatus::Starting);
1019 }
1020
1021 use crate::bootstrap::{
1026 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
1027 };
1028 use crate::channels::BootstrapEventSender;
1029 use async_trait::async_trait;
1030
1031 fn make_settings(
1032 query_id: &str,
1033 enable_bootstrap: bool,
1034 resume_from: Option<bytes::Bytes>,
1035 request_position_handle: bool,
1036 ) -> crate::config::SourceSubscriptionSettings {
1037 use std::collections::HashSet;
1038 crate::config::SourceSubscriptionSettings {
1039 source_id: "test-src".to_string(),
1040 enable_bootstrap,
1041 query_id: query_id.to_string(),
1042 nodes: HashSet::new(),
1043 relations: HashSet::new(),
1044 resume_from,
1045 request_position_handle,
1046 last_sequence: None,
1047 }
1048 }
1049
1050 struct NoopProvider;
1053
1054 #[async_trait]
1055 impl BootstrapProvider for NoopProvider {
1056 async fn bootstrap(
1057 &self,
1058 _request: BootstrapRequest,
1059 _context: &BootstrapContext,
1060 _event_tx: BootstrapEventSender,
1061 _settings: Option<&crate::config::SourceSubscriptionSettings>,
1062 ) -> Result<BootstrapResult> {
1063 Ok(BootstrapResult::default())
1064 }
1065 }
1066
1067 fn make_base_with_bootstrap(id: &str) -> SourceBase {
1068 let mut params = SourceBaseParams::new(id);
1069 params.bootstrap_provider = Some(Box::new(NoopProvider));
1070 SourceBase::new(params).unwrap()
1071 }
1072
1073 #[tokio::test]
1074 async fn test_create_position_handle_initializes_to_u64_max() {
1075 let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
1076 let handle = base.create_position_handle("q1").await;
1077 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1078 }
1079
1080 #[tokio::test]
1081 async fn test_create_position_handle_idempotent_for_same_query() {
1082 let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
1083 let h1 = base.create_position_handle("q1").await;
1084 h1.store(123, Ordering::Relaxed);
1085 let h2 = base.create_position_handle("q1").await;
1086 assert!(Arc::ptr_eq(&h1, &h2));
1088 assert_eq!(h2.load(Ordering::Relaxed), 123);
1089 }
1090
1091 #[tokio::test]
1092 async fn test_remove_position_handle_drops_entry() {
1093 let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
1094 let handle = base.create_position_handle("q1").await;
1095 handle.store(42, Ordering::Relaxed);
1096 assert_eq!(base.compute_confirmed_position().await, Some(42));
1097 base.remove_position_handle("q1").await;
1098 assert_eq!(base.compute_confirmed_position().await, None);
1099 }
1100
1101 #[tokio::test]
1102 async fn test_remove_position_handle_noop_when_absent() {
1103 let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
1104 base.remove_position_handle("never-registered").await;
1106 assert_eq!(base.compute_confirmed_position().await, None);
1107 }
1108
1109 #[tokio::test]
1110 async fn test_compute_confirmed_position_returns_none_when_empty() {
1111 let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1112 assert_eq!(base.compute_confirmed_position().await, None);
1113 }
1114
1115 #[tokio::test]
1116 async fn test_compute_confirmed_position_returns_none_when_all_max() {
1117 let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1118 let _h1 = base.create_position_handle("q1").await;
1119 let _h2 = base.create_position_handle("q2").await;
1120 assert_eq!(base.compute_confirmed_position().await, None);
1121 }
1122
1123 #[tokio::test]
1124 async fn test_compute_confirmed_position_filters_max_returns_min() {
1125 let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1126 let h1 = base.create_position_handle("q1").await;
1127 let _h2 = base.create_position_handle("q2").await; let h3 = base.create_position_handle("q3").await;
1129 h1.store(100, Ordering::Relaxed);
1130 h3.store(50, Ordering::Relaxed);
1131 assert_eq!(base.compute_confirmed_position().await, Some(50));
1132 }
1133
1134 #[tokio::test]
1135 async fn test_compute_confirmed_position_single_real_value() {
1136 let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1137 let h1 = base.create_position_handle("q1").await;
1138 let _h2 = base.create_position_handle("q2").await;
1139 h1.store(7, Ordering::Relaxed);
1140 assert_eq!(base.compute_confirmed_position().await, Some(7));
1141 }
1142
1143 #[tokio::test]
1144 async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1145 let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1146 {
1147 let handle = base.create_position_handle("q1").await;
1148 handle.store(99, Ordering::Relaxed);
1149 }
1151 base.cleanup_stale_handles().await;
1152 assert_eq!(base.compute_confirmed_position().await, None);
1153 }
1154
1155 #[tokio::test]
1156 async fn test_cleanup_stale_handles_keeps_held_arc() {
1157 let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1158 let handle = base.create_position_handle("q1").await;
1159 handle.store(11, Ordering::Relaxed);
1160 base.cleanup_stale_handles().await;
1161 assert_eq!(base.compute_confirmed_position().await, Some(11));
1163 drop(handle);
1165 }
1166
1167 #[tokio::test]
1168 async fn test_subscribe_with_request_position_handle_returns_handle() {
1169 let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1170 let response = base
1171 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1172 .await
1173 .unwrap();
1174 let handle = response.position_handle.expect("expected handle");
1175 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1176 assert_eq!(base.compute_confirmed_position().await, None); }
1179
1180 #[tokio::test]
1181 async fn test_subscribe_without_request_position_handle_returns_none() {
1182 let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1183 let response = base
1184 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1185 .await
1186 .unwrap();
1187 assert!(response.position_handle.is_none());
1188 let handles = base.position_handles.read().await;
1191 assert!(handles.is_empty());
1192 }
1193
1194 #[tokio::test]
1195 async fn test_subscribe_returned_handle_shared_with_internal() {
1196 let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1197 let response = base
1198 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1199 .await
1200 .unwrap();
1201 let handle = response.position_handle.unwrap();
1202 handle.store(42, Ordering::Relaxed);
1203 assert_eq!(base.compute_confirmed_position().await, Some(42));
1204 }
1205
1206 #[tokio::test]
1207 async fn test_subscribe_with_resume_from_skips_bootstrap() {
1208 let base = make_base_with_bootstrap("sub-resume");
1209 let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1210 let response = base
1211 .subscribe_with_bootstrap(&make_settings("q1", true, Some(position), false), "test")
1212 .await
1213 .unwrap();
1214 assert!(
1215 response.bootstrap_receiver.is_none(),
1216 "resume_from must override enable_bootstrap"
1217 );
1218 }
1219
1220 #[tokio::test]
1221 async fn test_subscribe_resume_without_bootstrap_still_none() {
1222 let base = make_base_with_bootstrap("sub-resume-no-bs");
1223 let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1224 let response = base
1225 .subscribe_with_bootstrap(&make_settings("q1", false, Some(position), false), "test")
1226 .await
1227 .unwrap();
1228 assert!(response.bootstrap_receiver.is_none());
1229 }
1230
1231 #[tokio::test]
1232 async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1233 let base = make_base_with_bootstrap("sub-bs");
1234 let response = base
1235 .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1236 .await
1237 .unwrap();
1238 assert!(
1239 response.bootstrap_receiver.is_some(),
1240 "regression guard: bootstrap path must still produce a receiver"
1241 );
1242 }
1243
1244 #[tokio::test]
1245 async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1246 let base = make_base_with_bootstrap("sub-neither");
1247 let response = base
1248 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1249 .await
1250 .unwrap();
1251 assert!(response.bootstrap_receiver.is_none());
1252 assert!(response.position_handle.is_none());
1253 }
1254}