1use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::HashMap;
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::RwLock;
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::state_store::StateStoreProvider;
47use drasi_core::models::SourceChange;
48
49pub struct SourceBaseParams {
67 pub id: String,
69 pub dispatch_mode: Option<DispatchMode>,
71 pub dispatch_buffer_capacity: Option<usize>,
73 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
75 pub auto_start: bool,
77}
78
79impl std::fmt::Debug for SourceBaseParams {
80 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
81 f.debug_struct("SourceBaseParams")
82 .field("id", &self.id)
83 .field("dispatch_mode", &self.dispatch_mode)
84 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
85 .field(
86 "bootstrap_provider",
87 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
88 )
89 .field("auto_start", &self.auto_start)
90 .finish()
91 }
92}
93
94impl SourceBaseParams {
95 pub fn new(id: impl Into<String>) -> Self {
97 Self {
98 id: id.into(),
99 dispatch_mode: None,
100 dispatch_buffer_capacity: None,
101 bootstrap_provider: None,
102 auto_start: true,
103 }
104 }
105
106 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
108 self.dispatch_mode = Some(mode);
109 self
110 }
111
112 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
114 self.dispatch_buffer_capacity = Some(capacity);
115 self
116 }
117
118 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
123 self.bootstrap_provider = Some(Box::new(provider));
124 self
125 }
126
127 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
132 self.auto_start = auto_start;
133 self
134 }
135}
136
137pub struct SourceBase {
139 pub id: String,
141 dispatch_mode: DispatchMode,
143 dispatch_buffer_capacity: usize,
145 pub auto_start: bool,
147 status_handle: ComponentStatusHandle,
149 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
155 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
157 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
159 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
161 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
163 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
165 identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
169 position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
178}
179
180impl SourceBase {
181 pub fn new(params: SourceBaseParams) -> Result<Self> {
189 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
191 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
192
193 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
195 Vec::new();
196
197 if dispatch_mode == DispatchMode::Broadcast {
198 let dispatcher =
200 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
201 dispatchers.push(Box::new(dispatcher));
202 }
203 let bootstrap_provider = params
207 .bootstrap_provider
208 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
209
210 Ok(Self {
211 id: params.id.clone(),
212 dispatch_mode,
213 dispatch_buffer_capacity,
214 auto_start: params.auto_start,
215 status_handle: ComponentStatusHandle::new(¶ms.id),
216 dispatchers: Arc::new(RwLock::new(dispatchers)),
217 context: Arc::new(RwLock::new(None)), state_store: Arc::new(RwLock::new(None)), task_handle: Arc::new(RwLock::new(None)),
220 shutdown_tx: Arc::new(RwLock::new(None)),
221 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
222 identity_provider: Arc::new(RwLock::new(None)),
223 position_handles: Arc::new(RwLock::new(HashMap::new())),
224 })
225 }
226
227 pub fn get_auto_start(&self) -> bool {
229 self.auto_start
230 }
231
232 pub async fn initialize(&self, context: SourceRuntimeContext) {
242 *self.context.write().await = Some(context.clone());
244
245 self.status_handle.wire(context.update_tx.clone()).await;
247
248 if let Some(state_store) = context.state_store.as_ref() {
249 *self.state_store.write().await = Some(state_store.clone());
250 }
251
252 if let Some(ip) = context.identity_provider.as_ref() {
254 let mut guard = self.identity_provider.write().await;
255 if guard.is_none() {
256 *guard = Some(ip.clone());
257 }
258 }
259 }
260
261 pub async fn context(&self) -> Option<SourceRuntimeContext> {
265 self.context.read().await.clone()
266 }
267
268 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
272 self.state_store.read().await.clone()
273 }
274
275 pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
281 self.identity_provider.read().await.clone()
282 }
283
284 pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
290 *self.identity_provider.write().await = Some(provider);
291 }
292
293 pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
301 let mut handles = self.position_handles.write().await;
302 if let Some(existing) = handles.get(query_id) {
303 return existing.clone();
304 }
305 let handle = Arc::new(AtomicU64::new(u64::MAX));
306 handles.insert(query_id.to_string(), handle.clone());
307 handle
308 }
309
310 pub async fn remove_position_handle(&self, query_id: &str) {
316 let mut handles = self.position_handles.write().await;
317 handles.remove(query_id);
318 }
319
320 pub async fn compute_confirmed_position(&self) -> Option<u64> {
332 self.cleanup_stale_handles().await;
333 let handles = self.position_handles.read().await;
334 let mut min: Option<u64> = None;
335 for handle in handles.values() {
336 let v = handle.load(Ordering::Relaxed);
337 if v == u64::MAX {
338 continue;
339 }
340 min = Some(min.map_or(v, |m| m.min(v)));
341 }
342 min
343 }
344
345 pub async fn cleanup_stale_handles(&self) {
357 let mut handles = self.position_handles.write().await;
358 handles.retain(|_, handle| Arc::strong_count(handle) > 1);
359 }
360
361 pub fn status_handle(&self) -> ComponentStatusHandle {
366 self.status_handle.clone()
367 }
368
369 pub fn clone_shared(&self) -> Self {
374 Self {
375 id: self.id.clone(),
376 dispatch_mode: self.dispatch_mode,
377 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
378 auto_start: self.auto_start,
379 status_handle: self.status_handle.clone(),
380 dispatchers: self.dispatchers.clone(),
381 context: self.context.clone(),
382 state_store: self.state_store.clone(),
383 task_handle: self.task_handle.clone(),
384 shutdown_tx: self.shutdown_tx.clone(),
385 bootstrap_provider: self.bootstrap_provider.clone(),
386 identity_provider: self.identity_provider.clone(),
387 position_handles: self.position_handles.clone(),
388 }
389 }
390
391 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
402 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
403 }
404
405 pub fn get_id(&self) -> &str {
407 &self.id
408 }
409
410 pub async fn create_streaming_receiver(
418 &self,
419 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
420 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
421 DispatchMode::Broadcast => {
422 let dispatchers = self.dispatchers.read().await;
424 if let Some(dispatcher) = dispatchers.first() {
425 dispatcher.create_receiver().await?
426 } else {
427 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
428 }
429 }
430 DispatchMode::Channel => {
431 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
433 self.dispatch_buffer_capacity,
434 );
435 let receiver = dispatcher.create_receiver().await?;
436
437 let mut dispatchers = self.dispatchers.write().await;
439 dispatchers.push(Box::new(dispatcher));
440
441 receiver
442 }
443 };
444
445 Ok(receiver)
446 }
447
448 pub async fn subscribe_with_bootstrap(
456 &self,
457 settings: &crate::config::SourceSubscriptionSettings,
458 source_type: &str,
459 ) -> Result<SubscriptionResponse> {
460 info!(
461 "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
462 settings.query_id,
463 source_type,
464 self.id,
465 settings.enable_bootstrap,
466 settings.resume_from,
467 settings.request_position_handle
468 );
469
470 let receiver = self.create_streaming_receiver().await?;
472
473 let query_id_for_response = settings.query_id.clone();
474
475 let bootstrap_receiver = if settings.resume_from.is_some() {
479 info!(
480 "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
481 settings.query_id, settings.resume_from, source_type, self.id
482 );
483 None
484 } else if settings.enable_bootstrap {
485 self.handle_bootstrap_subscription(settings, source_type)
486 .await?
487 } else {
488 None
489 };
490
491 let position_handle = if settings.request_position_handle {
495 Some(self.create_position_handle(&settings.query_id).await)
496 } else {
497 None
498 };
499
500 Ok(SubscriptionResponse {
501 query_id: query_id_for_response,
502 source_id: self.id.clone(),
503 receiver,
504 bootstrap_receiver,
505 position_handle,
506 })
507 }
508
509 async fn handle_bootstrap_subscription(
511 &self,
512 settings: &crate::config::SourceSubscriptionSettings,
513 source_type: &str,
514 ) -> Result<Option<BootstrapEventReceiver>> {
515 let provider_guard = self.bootstrap_provider.read().await;
516 if let Some(provider) = provider_guard.clone() {
517 drop(provider_guard); info!(
520 "Creating bootstrap for query '{}' on {} source '{}'",
521 settings.query_id, source_type, self.id
522 );
523
524 let context = BootstrapContext::new_minimal(
526 self.id.clone(), self.id.clone(), );
529
530 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
532
533 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
535 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
536
537 let request = BootstrapRequest {
539 query_id: settings.query_id.clone(),
540 node_labels,
541 relation_labels,
542 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
543 };
544
545 let settings_clone = settings.clone();
547 let source_id = self.id.clone();
548
549 let instance_id = self
551 .context()
552 .await
553 .map(|c| c.instance_id.clone())
554 .unwrap_or_default();
555
556 let span = tracing::info_span!(
558 "source_bootstrap",
559 instance_id = %instance_id,
560 component_id = %source_id,
561 component_type = "source"
562 );
563 tokio::spawn(
564 async move {
565 match provider
566 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
567 .await
568 {
569 Ok(result) => {
570 info!(
571 "Bootstrap completed successfully for query '{}', sent {} events",
572 settings_clone.query_id, result.event_count
573 );
574 }
579 Err(e) => {
580 error!(
581 "Bootstrap failed for query '{}': {e}",
582 settings_clone.query_id
583 );
584 }
585 }
586 }
587 .instrument(span),
588 );
589
590 Ok(Some(bootstrap_rx))
591 } else {
592 info!(
593 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
594 settings.query_id, source_type, self.id
595 );
596 Ok(None)
597 }
598 }
599
600 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
608 let mut profiling = profiling::ProfilingMetadata::new();
610 profiling.source_send_ns = Some(profiling::timestamp_ns());
611
612 let wrapper = SourceEventWrapper::with_profiling(
614 self.id.clone(),
615 SourceEvent::Change(change),
616 chrono::Utc::now(),
617 profiling,
618 );
619
620 self.dispatch_event(wrapper).await
622 }
623
624 pub async fn dispatch_event(&self, wrapper: SourceEventWrapper) -> Result<()> {
630 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
631
632 let arc_wrapper = Arc::new(wrapper);
634
635 let dispatchers = self.dispatchers.read().await;
637 for dispatcher in dispatchers.iter() {
638 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
639 debug!("[{}] Failed to dispatch event: {}", self.id, e);
640 }
641 }
642
643 Ok(())
644 }
645
646 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
648 let wrapper = SourceEventWrapper::new(
649 self.id.clone(),
650 SourceEvent::Control(control),
651 chrono::Utc::now(),
652 );
653 self.dispatch_event(wrapper).await
654 }
655
656 pub fn try_test_subscribe(
669 &self,
670 ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
671 tokio::task::block_in_place(|| {
672 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
673 })
674 }
675
676 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
684 self.try_test_subscribe()
685 .expect("Failed to create test subscription receiver")
686 }
687
688 pub async fn dispatch_from_task(
700 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
701 wrapper: SourceEventWrapper,
702 source_id: &str,
703 ) -> Result<()> {
704 debug!(
705 "[{}] Dispatching event from task: {:?}",
706 source_id, &wrapper
707 );
708
709 let arc_wrapper = Arc::new(wrapper);
711
712 let dispatchers_guard = dispatchers.read().await;
714 for dispatcher in dispatchers_guard.iter() {
715 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
716 debug!("[{source_id}] Failed to dispatch event from task: {e}");
717 }
718 }
719
720 Ok(())
721 }
722
723 pub async fn stop_common(&self) -> Result<()> {
725 info!("Stopping source '{}'", self.id);
726
727 if let Some(tx) = self.shutdown_tx.write().await.take() {
729 let _ = tx.send(());
730 }
731
732 if let Some(mut handle) = self.task_handle.write().await.take() {
734 match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
735 Ok(Ok(())) => {
736 info!("Source '{}' task completed successfully", self.id);
737 }
738 Ok(Err(e)) => {
739 error!("Source '{}' task panicked: {}", self.id, e);
740 }
741 Err(_) => {
742 warn!(
743 "Source '{}' task did not complete within timeout, aborting",
744 self.id
745 );
746 handle.abort();
747 }
748 }
749 }
750
751 self.set_status(
752 ComponentStatus::Stopped,
753 Some(format!("Source '{}' stopped", self.id)),
754 )
755 .await;
756 info!("Source '{}' stopped", self.id);
757 Ok(())
758 }
759
760 pub async fn deprovision_common(&self) -> Result<()> {
766 info!("Deprovisioning source '{}'", self.id);
767 if let Some(store) = self.state_store().await {
768 let count = store.clear_store(&self.id).await.map_err(|e| {
769 anyhow::anyhow!(
770 "Failed to clear state store for source '{}': {}",
771 self.id,
772 e
773 )
774 })?;
775 info!(
776 "Cleared {} keys from state store for source '{}'",
777 count, self.id
778 );
779 }
780 Ok(())
781 }
782
783 pub async fn get_status(&self) -> ComponentStatus {
785 self.status_handle.get_status().await
786 }
787
788 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
792 self.status_handle.set_status(status, message).await;
793 }
794
795 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
797 *self.task_handle.write().await = Some(handle);
798 }
799
800 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
802 *self.shutdown_tx.write().await = Some(tx);
803 }
804}
805
806#[cfg(test)]
807mod tests {
808 use super::*;
809
810 #[test]
815 fn test_params_new_defaults() {
816 let params = SourceBaseParams::new("test-source");
817 assert_eq!(params.id, "test-source");
818 assert!(params.dispatch_mode.is_none());
819 assert!(params.dispatch_buffer_capacity.is_none());
820 assert!(params.bootstrap_provider.is_none());
821 assert!(params.auto_start);
822 }
823
824 #[test]
825 fn test_params_with_dispatch_mode() {
826 let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
827 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
828 }
829
830 #[test]
831 fn test_params_with_dispatch_buffer_capacity() {
832 let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
833 assert_eq!(params.dispatch_buffer_capacity, Some(50000));
834 }
835
836 #[test]
837 fn test_params_with_auto_start_false() {
838 let params = SourceBaseParams::new("s1").with_auto_start(false);
839 assert!(!params.auto_start);
840 }
841
842 #[test]
843 fn test_params_builder_chaining() {
844 let params = SourceBaseParams::new("chained")
845 .with_dispatch_mode(DispatchMode::Broadcast)
846 .with_dispatch_buffer_capacity(2000)
847 .with_auto_start(false);
848
849 assert_eq!(params.id, "chained");
850 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
851 assert_eq!(params.dispatch_buffer_capacity, Some(2000));
852 assert!(!params.auto_start);
853 }
854
855 #[tokio::test]
860 async fn test_new_defaults() {
861 let params = SourceBaseParams::new("my-source");
862 let base = SourceBase::new(params).unwrap();
863
864 assert_eq!(base.id, "my-source");
865 assert!(base.auto_start);
866 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
867 }
868
869 #[tokio::test]
870 async fn test_get_id() {
871 let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
872 assert_eq!(base.get_id(), "id-check");
873 }
874
875 #[tokio::test]
876 async fn test_get_auto_start() {
877 let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
878 assert!(base_default.get_auto_start());
879
880 let base_false =
881 SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
882 assert!(!base_false.get_auto_start());
883 }
884
885 #[tokio::test]
886 async fn test_get_status_initial() {
887 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
888 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
889 }
890
891 #[tokio::test]
892 async fn test_set_status() {
893 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
894
895 base.set_status(ComponentStatus::Running, None).await;
896 assert_eq!(base.get_status().await, ComponentStatus::Running);
897
898 base.set_status(ComponentStatus::Error, Some("oops".into()))
899 .await;
900 assert_eq!(base.get_status().await, ComponentStatus::Error);
901 }
902
903 #[tokio::test]
904 async fn test_status_handle_returns_handle() {
905 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
906 let handle = base.status_handle();
907
908 assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
910
911 handle.set_status(ComponentStatus::Starting, None).await;
913 assert_eq!(base.get_status().await, ComponentStatus::Starting);
914 }
915
916 use crate::bootstrap::{
921 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
922 };
923 use crate::channels::BootstrapEventSender;
924 use async_trait::async_trait;
925
926 fn make_settings(
927 query_id: &str,
928 enable_bootstrap: bool,
929 resume_from: Option<u64>,
930 request_position_handle: bool,
931 ) -> crate::config::SourceSubscriptionSettings {
932 use std::collections::HashSet;
933 crate::config::SourceSubscriptionSettings {
934 source_id: "test-src".to_string(),
935 enable_bootstrap,
936 query_id: query_id.to_string(),
937 nodes: HashSet::new(),
938 relations: HashSet::new(),
939 resume_from,
940 request_position_handle,
941 }
942 }
943
944 struct NoopProvider;
947
948 #[async_trait]
949 impl BootstrapProvider for NoopProvider {
950 async fn bootstrap(
951 &self,
952 _request: BootstrapRequest,
953 _context: &BootstrapContext,
954 _event_tx: BootstrapEventSender,
955 _settings: Option<&crate::config::SourceSubscriptionSettings>,
956 ) -> Result<BootstrapResult> {
957 Ok(BootstrapResult::default())
958 }
959 }
960
961 fn make_base_with_bootstrap(id: &str) -> SourceBase {
962 let mut params = SourceBaseParams::new(id);
963 params.bootstrap_provider = Some(Box::new(NoopProvider));
964 SourceBase::new(params).unwrap()
965 }
966
967 #[tokio::test]
968 async fn test_create_position_handle_initializes_to_u64_max() {
969 let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
970 let handle = base.create_position_handle("q1").await;
971 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
972 }
973
974 #[tokio::test]
975 async fn test_create_position_handle_idempotent_for_same_query() {
976 let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
977 let h1 = base.create_position_handle("q1").await;
978 h1.store(123, Ordering::Relaxed);
979 let h2 = base.create_position_handle("q1").await;
980 assert!(Arc::ptr_eq(&h1, &h2));
982 assert_eq!(h2.load(Ordering::Relaxed), 123);
983 }
984
985 #[tokio::test]
986 async fn test_remove_position_handle_drops_entry() {
987 let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
988 let handle = base.create_position_handle("q1").await;
989 handle.store(42, Ordering::Relaxed);
990 assert_eq!(base.compute_confirmed_position().await, Some(42));
991 base.remove_position_handle("q1").await;
992 assert_eq!(base.compute_confirmed_position().await, None);
993 }
994
995 #[tokio::test]
996 async fn test_remove_position_handle_noop_when_absent() {
997 let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
998 base.remove_position_handle("never-registered").await;
1000 assert_eq!(base.compute_confirmed_position().await, None);
1001 }
1002
1003 #[tokio::test]
1004 async fn test_compute_confirmed_position_returns_none_when_empty() {
1005 let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1006 assert_eq!(base.compute_confirmed_position().await, None);
1007 }
1008
1009 #[tokio::test]
1010 async fn test_compute_confirmed_position_returns_none_when_all_max() {
1011 let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1012 let _h1 = base.create_position_handle("q1").await;
1013 let _h2 = base.create_position_handle("q2").await;
1014 assert_eq!(base.compute_confirmed_position().await, None);
1015 }
1016
1017 #[tokio::test]
1018 async fn test_compute_confirmed_position_filters_max_returns_min() {
1019 let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1020 let h1 = base.create_position_handle("q1").await;
1021 let _h2 = base.create_position_handle("q2").await; let h3 = base.create_position_handle("q3").await;
1023 h1.store(100, Ordering::Relaxed);
1024 h3.store(50, Ordering::Relaxed);
1025 assert_eq!(base.compute_confirmed_position().await, Some(50));
1026 }
1027
1028 #[tokio::test]
1029 async fn test_compute_confirmed_position_single_real_value() {
1030 let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1031 let h1 = base.create_position_handle("q1").await;
1032 let _h2 = base.create_position_handle("q2").await;
1033 h1.store(7, Ordering::Relaxed);
1034 assert_eq!(base.compute_confirmed_position().await, Some(7));
1035 }
1036
1037 #[tokio::test]
1038 async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1039 let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1040 {
1041 let handle = base.create_position_handle("q1").await;
1042 handle.store(99, Ordering::Relaxed);
1043 }
1045 base.cleanup_stale_handles().await;
1046 assert_eq!(base.compute_confirmed_position().await, None);
1047 }
1048
1049 #[tokio::test]
1050 async fn test_cleanup_stale_handles_keeps_held_arc() {
1051 let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1052 let handle = base.create_position_handle("q1").await;
1053 handle.store(11, Ordering::Relaxed);
1054 base.cleanup_stale_handles().await;
1055 assert_eq!(base.compute_confirmed_position().await, Some(11));
1057 drop(handle);
1059 }
1060
1061 #[tokio::test]
1062 async fn test_subscribe_with_request_position_handle_returns_handle() {
1063 let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1064 let response = base
1065 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1066 .await
1067 .unwrap();
1068 let handle = response.position_handle.expect("expected handle");
1069 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1070 assert_eq!(base.compute_confirmed_position().await, None); }
1073
1074 #[tokio::test]
1075 async fn test_subscribe_without_request_position_handle_returns_none() {
1076 let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1077 let response = base
1078 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1079 .await
1080 .unwrap();
1081 assert!(response.position_handle.is_none());
1082 let handles = base.position_handles.read().await;
1085 assert!(handles.is_empty());
1086 }
1087
1088 #[tokio::test]
1089 async fn test_subscribe_returned_handle_shared_with_internal() {
1090 let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1091 let response = base
1092 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1093 .await
1094 .unwrap();
1095 let handle = response.position_handle.unwrap();
1096 handle.store(42, Ordering::Relaxed);
1097 assert_eq!(base.compute_confirmed_position().await, Some(42));
1098 }
1099
1100 #[tokio::test]
1101 async fn test_subscribe_with_resume_from_skips_bootstrap() {
1102 let base = make_base_with_bootstrap("sub-resume");
1103 let response = base
1104 .subscribe_with_bootstrap(&make_settings("q1", true, Some(100), false), "test")
1105 .await
1106 .unwrap();
1107 assert!(
1108 response.bootstrap_receiver.is_none(),
1109 "resume_from must override enable_bootstrap"
1110 );
1111 }
1112
1113 #[tokio::test]
1114 async fn test_subscribe_resume_without_bootstrap_still_none() {
1115 let base = make_base_with_bootstrap("sub-resume-no-bs");
1116 let response = base
1117 .subscribe_with_bootstrap(&make_settings("q1", false, Some(100), false), "test")
1118 .await
1119 .unwrap();
1120 assert!(response.bootstrap_receiver.is_none());
1121 }
1122
1123 #[tokio::test]
1124 async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1125 let base = make_base_with_bootstrap("sub-bs");
1126 let response = base
1127 .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1128 .await
1129 .unwrap();
1130 assert!(
1131 response.bootstrap_receiver.is_some(),
1132 "regression guard: bootstrap path must still produce a receiver"
1133 );
1134 }
1135
1136 #[tokio::test]
1137 async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1138 let base = make_base_with_bootstrap("sub-neither");
1139 let response = base
1140 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1141 .await
1142 .unwrap();
1143 assert!(response.bootstrap_receiver.is_none());
1144 assert!(response.position_handle.is_none());
1145 }
1146}