1use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::{BTreeMap, HashMap};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::{Notify, RwLock};
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::sources::PositionComparator;
47use crate::state_store::StateStoreProvider;
48use bytes::Bytes;
49use drasi_core::models::SourceChange;
50
51pub struct SourceBaseParams {
69 pub id: String,
71 pub dispatch_mode: Option<DispatchMode>,
73 pub dispatch_buffer_capacity: Option<usize>,
75 pub state_store: Option<Arc<dyn StateStoreProvider>>,
82 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
84 pub auto_start: bool,
86}
87
88impl std::fmt::Debug for SourceBaseParams {
89 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
90 f.debug_struct("SourceBaseParams")
91 .field("id", &self.id)
92 .field("dispatch_mode", &self.dispatch_mode)
93 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
94 .field(
95 "state_store",
96 &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
97 )
98 .field(
99 "bootstrap_provider",
100 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
101 )
102 .field("auto_start", &self.auto_start)
103 .finish()
104 }
105}
106
107impl SourceBaseParams {
108 pub fn new(id: impl Into<String>) -> Self {
110 Self {
111 id: id.into(),
112 dispatch_mode: None,
113 dispatch_buffer_capacity: None,
114 state_store: None,
115 bootstrap_provider: None,
116 auto_start: true,
117 }
118 }
119
120 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
122 self.dispatch_mode = Some(mode);
123 self
124 }
125
126 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
128 self.dispatch_buffer_capacity = Some(capacity);
129 self
130 }
131
132 pub fn with_state_store(mut self, store: Arc<dyn StateStoreProvider>) -> Self {
137 self.state_store = Some(store);
138 self
139 }
140
141 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
146 self.bootstrap_provider = Some(Box::new(provider));
147 self
148 }
149
150 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
155 self.auto_start = auto_start;
156 self
157 }
158}
159
160pub struct SourceBase {
162 pub id: String,
164 dispatch_mode: DispatchMode,
166 dispatch_buffer_capacity: usize,
168 pub auto_start: bool,
170 status_handle: ComponentStatusHandle,
172 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
178 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
180 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
182 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
184 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
186 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
188 identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
192 position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
201 next_sequence: Arc<AtomicU64>,
204 raw_config: Option<serde_json::Value>,
207 subscriber_notify: Arc<Notify>,
211 subscriber_resume_positions: Arc<RwLock<HashMap<usize, Bytes>>>,
220 position_comparator: Arc<RwLock<Option<Arc<dyn PositionComparator>>>>,
225 sequence_position_map: Arc<RwLock<BTreeMap<u64, Bytes>>>,
237}
238
239impl SourceBase {
240 pub fn new(params: SourceBaseParams) -> Result<Self> {
248 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
250 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
251
252 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
254 Vec::new();
255
256 if dispatch_mode == DispatchMode::Broadcast {
257 let dispatcher =
259 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
260 dispatchers.push(Box::new(dispatcher));
261 }
262 let bootstrap_provider = params
266 .bootstrap_provider
267 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
268
269 Ok(Self {
270 id: params.id.clone(),
271 dispatch_mode,
272 dispatch_buffer_capacity,
273 auto_start: params.auto_start,
274 status_handle: ComponentStatusHandle::new(¶ms.id),
275 dispatchers: Arc::new(RwLock::new(dispatchers)),
276 context: Arc::new(RwLock::new(None)), state_store: Arc::new(RwLock::new(params.state_store)), task_handle: Arc::new(RwLock::new(None)),
279 shutdown_tx: Arc::new(RwLock::new(None)),
280 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
281 identity_provider: Arc::new(RwLock::new(None)),
282 position_handles: Arc::new(RwLock::new(HashMap::new())),
283 next_sequence: Arc::new(AtomicU64::new(1)),
284 raw_config: None,
285 subscriber_notify: Arc::new(Notify::new()),
286 subscriber_resume_positions: Arc::new(RwLock::new(HashMap::new())),
287 position_comparator: Arc::new(RwLock::new(None)),
288 sequence_position_map: Arc::new(RwLock::new(BTreeMap::new())),
289 })
290 }
291
292 pub fn get_auto_start(&self) -> bool {
294 self.auto_start
295 }
296
297 pub fn get_dispatch_mode(&self) -> DispatchMode {
299 self.dispatch_mode
300 }
301
302 pub fn set_raw_config(&mut self, config: serde_json::Value) {
308 self.raw_config = Some(config);
309 }
310
311 pub fn raw_config(&self) -> Option<&serde_json::Value> {
316 self.raw_config.as_ref()
317 }
318
319 pub fn properties_or_serialize<D: serde::Serialize>(
327 &self,
328 fallback_dto: &D,
329 ) -> HashMap<String, serde_json::Value> {
330 if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
331 return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
332 }
333
334 match serde_json::to_value(fallback_dto) {
335 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
336 _ => HashMap::new(),
337 }
338 }
339
340 pub async fn initialize(&self, context: SourceRuntimeContext) {
350 *self.context.write().await = Some(context.clone());
352
353 self.status_handle.wire(context.update_tx.clone()).await;
355
356 if let Some(state_store) = context.state_store.as_ref() {
358 let mut guard = self.state_store.write().await;
359 if guard.is_none() {
360 *guard = Some(state_store.clone());
361 }
362 }
363
364 if let Some(ip) = context.identity_provider.as_ref() {
366 let mut guard = self.identity_provider.write().await;
367 if guard.is_none() {
368 *guard = Some(ip.clone());
369 }
370 }
371 }
372
373 pub async fn context(&self) -> Option<SourceRuntimeContext> {
377 self.context.read().await.clone()
378 }
379
380 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
384 self.state_store.read().await.clone()
385 }
386
387 pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
393 self.identity_provider.read().await.clone()
394 }
395
396 pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
402 *self.identity_provider.write().await = Some(provider);
403 }
404
405 pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
413 let mut handles = self.position_handles.write().await;
414 if let Some(existing) = handles.get(query_id) {
415 return existing.clone();
416 }
417 let handle = Arc::new(AtomicU64::new(u64::MAX));
418 handles.insert(query_id.to_string(), handle.clone());
419 handle
420 }
421
422 pub async fn remove_position_handle(&self, query_id: &str) {
428 let mut handles = self.position_handles.write().await;
429 handles.remove(query_id);
430 }
431
432 pub async fn compute_confirmed_position(&self) -> Option<u64> {
444 self.cleanup_stale_handles().await;
445 let handles = self.position_handles.read().await;
446 let mut min: Option<u64> = None;
447 for handle in handles.values() {
448 let v = handle.load(Ordering::Relaxed);
449 if v == u64::MAX {
450 continue;
451 }
452 min = Some(min.map_or(v, |m| m.min(v)));
453 }
454 min
455 }
456
457 pub async fn cleanup_stale_handles(&self) {
469 let mut handles = self.position_handles.write().await;
470 handles.retain(|_, handle| Arc::strong_count(handle) > 1);
471 }
472
473 pub async fn compute_confirmed_source_position(&self) -> Option<Bytes> {
484 let confirmed_seq = self.compute_confirmed_position().await?;
485 let map = self.sequence_position_map.read().await;
486 map.range(..=confirmed_seq)
488 .next_back()
489 .map(|(_, pos)| pos.clone())
490 }
491
492 pub async fn prune_position_map(&self, up_to_seq: u64) {
498 let mut map = self.sequence_position_map.write().await;
499 let keep = map.split_off(&(up_to_seq.saturating_add(1)));
501 *map = keep;
502 }
503
504 pub fn set_next_sequence(&self, sequence: u64) {
507 self.next_sequence
508 .store(sequence.saturating_add(1), Ordering::Relaxed);
509 }
510
511 pub fn apply_subscription_settings(
518 &self,
519 settings: &crate::config::SourceSubscriptionSettings,
520 ) {
521 if let Some(last_seq) = settings.last_sequence {
522 let next = last_seq.saturating_add(1);
525 let prev = self.next_sequence.fetch_max(next, Ordering::Relaxed);
526 if next > prev {
527 info!(
528 "[{}] Sequence counter recovered to {} (from checkpoint last_sequence={})",
529 self.id, next, last_seq
530 );
531 }
532 }
533 }
534
535 pub fn status_handle(&self) -> ComponentStatusHandle {
540 self.status_handle.clone()
541 }
542
543 pub fn clone_shared(&self) -> Self {
548 Self {
549 id: self.id.clone(),
550 dispatch_mode: self.dispatch_mode,
551 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
552 auto_start: self.auto_start,
553 status_handle: self.status_handle.clone(),
554 dispatchers: self.dispatchers.clone(),
555 context: self.context.clone(),
556 state_store: self.state_store.clone(),
557 task_handle: self.task_handle.clone(),
558 shutdown_tx: self.shutdown_tx.clone(),
559 bootstrap_provider: self.bootstrap_provider.clone(),
560 identity_provider: self.identity_provider.clone(),
561 position_handles: self.position_handles.clone(),
562 next_sequence: self.next_sequence.clone(),
563 raw_config: self.raw_config.clone(),
564 subscriber_notify: self.subscriber_notify.clone(),
565 subscriber_resume_positions: self.subscriber_resume_positions.clone(),
566 position_comparator: self.position_comparator.clone(),
567 sequence_position_map: self.sequence_position_map.clone(),
568 }
569 }
570
571 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
582 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
583 }
584
585 pub async fn set_position_comparator(&self, comparator: impl PositionComparator + 'static) {
591 *self.position_comparator.write().await = Some(Arc::new(comparator));
592 }
593
594 pub fn get_id(&self) -> &str {
596 &self.id
597 }
598
599 pub async fn create_streaming_receiver(
607 &self,
608 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
609 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
610 DispatchMode::Broadcast => {
611 let dispatchers = self.dispatchers.read().await;
613 if let Some(dispatcher) = dispatchers.first() {
614 dispatcher.create_receiver().await?
615 } else {
616 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
617 }
618 }
619 DispatchMode::Channel => {
620 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
622 self.dispatch_buffer_capacity,
623 );
624 let receiver = dispatcher.create_receiver().await?;
625
626 let mut dispatchers = self.dispatchers.write().await;
628 dispatchers.push(Box::new(dispatcher));
629
630 receiver
631 }
632 };
633
634 self.subscriber_notify.notify_one();
638
639 Ok(receiver)
640 }
641
642 pub async fn wait_for_subscribers(&self) {
653 loop {
654 let dispatchers = self.dispatchers.read().await;
655 if !dispatchers.is_empty() {
656 return;
657 }
658 drop(dispatchers);
659 self.subscriber_notify.notified().await;
660 }
661 }
662
663 pub async fn subscribe_with_bootstrap(
671 &self,
672 settings: &crate::config::SourceSubscriptionSettings,
673 source_type: &str,
674 ) -> Result<SubscriptionResponse> {
675 self.apply_subscription_settings(settings);
677
678 self.subscribe_with_bootstrap_context(settings, source_type, HashMap::new())
679 .await
680 }
681
682 pub async fn subscribe_with_bootstrap_context(
684 &self,
685 settings: &crate::config::SourceSubscriptionSettings,
686 source_type: &str,
687 bootstrap_properties: HashMap<String, serde_json::Value>,
688 ) -> Result<SubscriptionResponse> {
689 info!(
690 "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
691 settings.query_id,
692 source_type,
693 self.id,
694 settings.enable_bootstrap,
695 settings.resume_from,
696 settings.request_position_handle
697 );
698
699 let receiver = self.create_streaming_receiver().await?;
701
702 if self.dispatch_mode == DispatchMode::Channel {
706 if let Some(ref resume_pos) = settings.resume_from {
707 let dispatchers = self.dispatchers.read().await;
708 let dispatcher_idx = dispatchers.len().saturating_sub(1);
709 drop(dispatchers);
710 self.subscriber_resume_positions
711 .write()
712 .await
713 .insert(dispatcher_idx, resume_pos.clone());
714 debug!(
715 "[{}] Registered resume position filter for subscriber '{}' at dispatcher index {}",
716 self.id, settings.query_id, dispatcher_idx
717 );
718 }
719 }
720
721 let query_id_for_response = settings.query_id.clone();
722
723 let (bootstrap_receiver, bootstrap_result_receiver) = if settings.resume_from.is_some() {
727 info!(
728 "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
729 settings.query_id, settings.resume_from, source_type, self.id
730 );
731 (None, None)
732 } else if settings.enable_bootstrap {
733 match self
734 .handle_bootstrap_subscription(settings, source_type, bootstrap_properties)
735 .await?
736 {
737 Some((event_rx, result_rx)) => (Some(event_rx), Some(result_rx)),
738 None => (None, None),
739 }
740 } else {
741 (None, None)
742 };
743
744 let position_handle = if settings.request_position_handle {
748 let handle = self.create_position_handle(&settings.query_id).await;
749 if let Some(last_seq) = settings.last_sequence {
755 handle.store(last_seq, Ordering::Release);
756 }
757 Some(handle)
758 } else {
759 None
760 };
761
762 Ok(SubscriptionResponse {
763 query_id: query_id_for_response,
764 source_id: self.id.clone(),
765 receiver,
766 bootstrap_receiver,
767 position_handle,
768 bootstrap_result_receiver,
769 })
770 }
771
772 pub async fn create_bootstrap_receiver(
774 &self,
775 settings: &crate::config::SourceSubscriptionSettings,
776 source_type: &str,
777 bootstrap_properties: HashMap<String, serde_json::Value>,
778 ) -> Result<Option<BootstrapEventReceiver>> {
779 Ok(self
780 .handle_bootstrap_subscription(settings, source_type, bootstrap_properties)
781 .await?
782 .map(|(bootstrap_receiver, _)| bootstrap_receiver))
783 }
784
785 async fn handle_bootstrap_subscription(
790 &self,
791 settings: &crate::config::SourceSubscriptionSettings,
792 source_type: &str,
793 bootstrap_properties: HashMap<String, serde_json::Value>,
794 ) -> Result<
795 Option<(
796 BootstrapEventReceiver,
797 tokio::sync::oneshot::Receiver<anyhow::Result<BootstrapResult>>,
798 )>,
799 > {
800 let provider_guard = self.bootstrap_provider.read().await;
801 if let Some(provider) = provider_guard.clone() {
802 drop(provider_guard); info!(
805 "Creating bootstrap for query '{}' on {} source '{}'",
806 settings.query_id, source_type, self.id
807 );
808
809 let context = if bootstrap_properties.is_empty() {
811 BootstrapContext::new_minimal(
812 self.id.clone(), self.id.clone(), )
815 } else {
816 BootstrapContext::with_properties(
817 self.id.clone(), self.id.clone(), bootstrap_properties,
820 )
821 };
822
823 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
825
826 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
828
829 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
831 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
832
833 let request = BootstrapRequest {
835 query_id: settings.query_id.clone(),
836 node_labels,
837 relation_labels,
838 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
839 };
840
841 let settings_clone = settings.clone();
843 let source_id = self.id.clone();
844
845 let instance_id = self
847 .context()
848 .await
849 .map(|c| c.instance_id.clone())
850 .unwrap_or_default();
851
852 let span = tracing::info_span!(
854 "source_bootstrap",
855 instance_id = %instance_id,
856 component_id = %source_id,
857 component_type = "source"
858 );
859 tokio::spawn(
860 async move {
861 let outcome = provider
862 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
863 .await;
864
865 match &outcome {
866 Ok(result) => {
867 info!(
868 "Bootstrap completed successfully for query '{}', sent {} events \
869 (last_sequence={:?}, sequences_aligned={})",
870 settings_clone.query_id,
871 result.event_count,
872 result.last_sequence,
873 result.sequences_aligned
874 );
875 }
876 Err(e) => {
877 error!(
878 "Bootstrap failed for query '{}': {e}",
879 settings_clone.query_id
880 );
881 }
882 }
883
884 let _ = result_tx.send(outcome);
887 }
888 .instrument(span),
889 );
890
891 Ok(Some((bootstrap_rx, result_rx)))
892 } else {
893 info!(
894 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
895 settings.query_id, source_type, self.id
896 );
897 Ok(None)
898 }
899 }
900
901 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
909 let mut profiling = profiling::ProfilingMetadata::new();
911 profiling.source_send_ns = Some(profiling::timestamp_ns());
912
913 let wrapper = SourceEventWrapper::with_profiling(
915 self.id.clone(),
916 SourceEvent::Change(change),
917 chrono::Utc::now(),
918 profiling,
919 );
920
921 self.dispatch_event(wrapper).await
923 }
924
925 pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536;
929
930 pub async fn dispatch_event(&self, mut wrapper: SourceEventWrapper) -> Result<()> {
937 if let Some(ref pos) = wrapper.source_position {
940 if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
941 warn!(
942 "[{}] Source position is large ({} bytes > {} limit); \
943 checkpoint staging will preserve the previous good position",
944 self.id,
945 pos.len(),
946 Self::MAX_SOURCE_POSITION_BYTES
947 );
948 }
949 }
950
951 wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
953
954 if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
956 self.sequence_position_map
957 .write()
958 .await
959 .insert(seq, pos.clone());
960 }
961
962 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
963
964 let arc_wrapper = Arc::new(wrapper);
966
967 let dispatchers = self.dispatchers.read().await;
969 let comparator = self.position_comparator.read().await;
970 let mut cleared_indices: Vec<usize> = Vec::new();
971 let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
973
974 for (idx, dispatcher) in dispatchers.iter().enumerate() {
975 if let Some(ref cmp) = *comparator {
982 let resume_positions = self.subscriber_resume_positions.read().await;
983 if let Some(resume_pos) = resume_positions.get(&idx) {
984 if let Some(ref event_pos) = arc_wrapper.source_position {
985 if !cmp.position_reached(event_pos, resume_pos) {
986 continue;
988 }
989 cleared_indices.push(idx);
991 }
992 }
994 }
995
996 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
997 debug!("[{}] Failed to dispatch event: {}", self.id, e);
998 } else if let Some(ref event_pos) = arc_wrapper.source_position {
999 hwm_updates.push((idx, event_pos.clone()));
1001 }
1002 }
1003 drop(comparator);
1004 drop(dispatchers);
1005
1006 if !hwm_updates.is_empty() {
1011 let mut resume_positions = self.subscriber_resume_positions.write().await;
1012 for (idx, pos) in hwm_updates {
1013 resume_positions.insert(idx, pos);
1014 }
1015 }
1016
1017 Ok(())
1018 }
1019
1020 pub async fn dispatch_events_batch(&self, events: Vec<SourceEventWrapper>) -> Result<()> {
1025 if events.is_empty() {
1026 return Ok(());
1027 }
1028
1029 let dispatchers = self.dispatchers.read().await;
1030 let comparator = self.position_comparator.read().await;
1031
1032 for mut wrapper in events {
1033 if let Some(ref pos) = wrapper.source_position {
1034 if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
1035 warn!(
1036 "[{}] Source position is large ({} bytes > {} limit); \
1037 checkpoint staging will preserve the previous good position",
1038 self.id,
1039 pos.len(),
1040 Self::MAX_SOURCE_POSITION_BYTES
1041 );
1042 }
1043 }
1044
1045 wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
1046
1047 if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
1049 self.sequence_position_map
1050 .write()
1051 .await
1052 .insert(seq, pos.clone());
1053 }
1054
1055 debug!("[{}] Dispatching event (batch): {:?}", self.id, &wrapper);
1056
1057 let arc_wrapper = Arc::new(wrapper);
1058 let mut cleared_indices: Vec<usize> = Vec::new();
1059 let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
1060
1061 for (idx, dispatcher) in dispatchers.iter().enumerate() {
1062 if let Some(ref cmp) = *comparator {
1064 let resume_positions = self.subscriber_resume_positions.read().await;
1065 if let Some(resume_pos) = resume_positions.get(&idx) {
1066 if let Some(ref event_pos) = arc_wrapper.source_position {
1067 if !cmp.position_reached(event_pos, resume_pos) {
1068 debug!(
1069 "[{}] Position filter: SKIPPING event for dispatcher {} \
1070 (event_pos={:?} <= resume_pos={:?})",
1071 self.id,
1072 idx,
1073 event_pos.as_ref(),
1074 resume_pos.as_ref()
1075 );
1076 continue;
1077 }
1078 debug!(
1079 "[{}] Position filter: PASSING event for dispatcher {} \
1080 (event_pos={:?} > resume_pos={:?})",
1081 self.id,
1082 idx,
1083 event_pos.as_ref(),
1084 resume_pos.as_ref()
1085 );
1086 cleared_indices.push(idx);
1087 }
1088 } else {
1089 debug!(
1090 "[{}] Position filter: NO resume position for dispatcher {}, passing through",
1091 self.id, idx
1092 );
1093 }
1094 }
1095
1096 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1097 debug!("[{}] Failed to dispatch event: {}", self.id, e);
1098 } else if let Some(ref event_pos) = arc_wrapper.source_position {
1099 hwm_updates.push((idx, event_pos.clone()));
1100 }
1101 }
1102
1103 if !hwm_updates.is_empty() {
1105 let mut resume_positions = self.subscriber_resume_positions.write().await;
1106 for (idx, pos) in hwm_updates {
1107 resume_positions.insert(idx, pos);
1108 }
1109 }
1110 }
1111
1112 drop(comparator);
1113 drop(dispatchers);
1114
1115 Ok(())
1116 }
1117
1118 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
1120 let wrapper = SourceEventWrapper::new(
1121 self.id.clone(),
1122 SourceEvent::Control(control),
1123 chrono::Utc::now(),
1124 );
1125 self.dispatch_event(wrapper).await
1126 }
1127
1128 pub fn try_test_subscribe(
1141 &self,
1142 ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
1143 tokio::task::block_in_place(|| {
1144 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
1145 })
1146 }
1147
1148 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
1156 self.try_test_subscribe()
1157 .expect("Failed to create test subscription receiver")
1158 }
1159
1160 pub async fn dispatch_from_task(
1188 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
1189 wrapper: SourceEventWrapper,
1190 source_id: &str,
1191 ) -> Result<()> {
1192 debug!(
1193 "[{}] Dispatching event from task: {:?}",
1194 source_id, &wrapper
1195 );
1196
1197 let arc_wrapper = Arc::new(wrapper);
1199
1200 let dispatchers_guard = dispatchers.read().await;
1202 for dispatcher in dispatchers_guard.iter() {
1203 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1204 debug!("[{source_id}] Failed to dispatch event from task: {e}");
1205 }
1206 }
1207
1208 Ok(())
1209 }
1210
1211 pub async fn stop_common(&self) -> Result<()> {
1213 info!("Stopping source '{}'", self.id);
1214
1215 if let Some(tx) = self.shutdown_tx.write().await.take() {
1217 let _ = tx.send(());
1218 }
1219
1220 if let Some(mut handle) = self.task_handle.write().await.take() {
1222 match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
1223 Ok(Ok(())) => {
1224 info!("Source '{}' task completed successfully", self.id);
1225 }
1226 Ok(Err(e)) => {
1227 error!("Source '{}' task panicked: {}", self.id, e);
1228 }
1229 Err(_) => {
1230 warn!(
1231 "Source '{}' task did not complete within timeout, aborting",
1232 self.id
1233 );
1234 handle.abort();
1235 }
1236 }
1237 }
1238
1239 if self.dispatch_mode == DispatchMode::Channel {
1248 let mut dispatchers = self.dispatchers.write().await;
1249 dispatchers.clear();
1250 }
1251
1252 self.set_status(
1253 ComponentStatus::Stopped,
1254 Some(format!("Source '{}' stopped", self.id)),
1255 )
1256 .await;
1257 info!("Source '{}' stopped", self.id);
1258 Ok(())
1259 }
1260
1261 pub async fn clear_dispatchers(&self) {
1273 if self.dispatch_mode == DispatchMode::Channel {
1274 let mut dispatchers = self.dispatchers.write().await;
1275 dispatchers.clear();
1276 }
1277 }
1278
1279 pub async fn deprovision_common(&self) -> Result<()> {
1285 info!("Deprovisioning source '{}'", self.id);
1286 if let Some(store) = self.state_store().await {
1287 let count = store.clear_store(&self.id).await.map_err(|e| {
1288 anyhow::anyhow!(
1289 "Failed to clear state store for source '{}': {}",
1290 self.id,
1291 e
1292 )
1293 })?;
1294 info!(
1295 "Cleared {} keys from state store for source '{}'",
1296 count, self.id
1297 );
1298 }
1299 Ok(())
1300 }
1301
1302 pub async fn get_status(&self) -> ComponentStatus {
1304 self.status_handle.get_status().await
1305 }
1306
1307 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
1311 self.status_handle.set_status(status, message).await;
1312 }
1313
1314 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
1316 *self.task_handle.write().await = Some(handle);
1317 }
1318
1319 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
1321 *self.shutdown_tx.write().await = Some(tx);
1322 }
1323}
1324
1325#[cfg(test)]
1326mod tests {
1327 use super::*;
1328 use crate::sources::ByteLexPositionComparator;
1329
1330 #[test]
1335 fn test_params_new_defaults() {
1336 let params = SourceBaseParams::new("test-source");
1337 assert_eq!(params.id, "test-source");
1338 assert!(params.dispatch_mode.is_none());
1339 assert!(params.dispatch_buffer_capacity.is_none());
1340 assert!(params.bootstrap_provider.is_none());
1341 assert!(params.auto_start);
1342 }
1343
1344 #[test]
1345 fn test_params_with_dispatch_mode() {
1346 let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
1347 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1348 }
1349
1350 #[test]
1351 fn test_params_with_dispatch_buffer_capacity() {
1352 let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
1353 assert_eq!(params.dispatch_buffer_capacity, Some(50000));
1354 }
1355
1356 #[test]
1357 fn test_params_with_auto_start_false() {
1358 let params = SourceBaseParams::new("s1").with_auto_start(false);
1359 assert!(!params.auto_start);
1360 }
1361
1362 #[test]
1363 fn test_params_builder_chaining() {
1364 let params = SourceBaseParams::new("chained")
1365 .with_dispatch_mode(DispatchMode::Broadcast)
1366 .with_dispatch_buffer_capacity(2000)
1367 .with_auto_start(false);
1368
1369 assert_eq!(params.id, "chained");
1370 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1371 assert_eq!(params.dispatch_buffer_capacity, Some(2000));
1372 assert!(!params.auto_start);
1373 }
1374
1375 #[tokio::test]
1380 async fn test_new_defaults() {
1381 let params = SourceBaseParams::new("my-source");
1382 let base = SourceBase::new(params).unwrap();
1383
1384 assert_eq!(base.id, "my-source");
1385 assert!(base.auto_start);
1386 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1387 }
1388
1389 #[tokio::test]
1390 async fn test_get_id() {
1391 let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
1392 assert_eq!(base.get_id(), "id-check");
1393 }
1394
1395 #[tokio::test]
1396 async fn test_get_auto_start() {
1397 let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
1398 assert!(base_default.get_auto_start());
1399
1400 let base_false =
1401 SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
1402 assert!(!base_false.get_auto_start());
1403 }
1404
1405 #[tokio::test]
1406 async fn test_get_status_initial() {
1407 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1408 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1409 }
1410
1411 #[tokio::test]
1412 async fn test_set_status() {
1413 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1414
1415 base.set_status(ComponentStatus::Running, None).await;
1416 assert_eq!(base.get_status().await, ComponentStatus::Running);
1417
1418 base.set_status(ComponentStatus::Error, Some("oops".into()))
1419 .await;
1420 assert_eq!(base.get_status().await, ComponentStatus::Error);
1421 }
1422
1423 #[tokio::test]
1424 async fn test_status_handle_returns_handle() {
1425 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1426 let handle = base.status_handle();
1427
1428 assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
1430
1431 handle.set_status(ComponentStatus::Starting, None).await;
1433 assert_eq!(base.get_status().await, ComponentStatus::Starting);
1434 }
1435
1436 use crate::bootstrap::{
1441 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
1442 };
1443 use crate::channels::BootstrapEventSender;
1444 use async_trait::async_trait;
1445
1446 fn make_settings(
1447 query_id: &str,
1448 enable_bootstrap: bool,
1449 resume_from: Option<bytes::Bytes>,
1450 request_position_handle: bool,
1451 ) -> crate::config::SourceSubscriptionSettings {
1452 use std::collections::HashSet;
1453 crate::config::SourceSubscriptionSettings {
1454 source_id: "test-src".to_string(),
1455 enable_bootstrap,
1456 query_id: query_id.to_string(),
1457 nodes: HashSet::new(),
1458 relations: HashSet::new(),
1459 resume_from,
1460 request_position_handle,
1461 last_sequence: None,
1462 }
1463 }
1464
1465 struct NoopProvider;
1468
1469 #[async_trait]
1470 impl BootstrapProvider for NoopProvider {
1471 async fn bootstrap(
1472 &self,
1473 _request: BootstrapRequest,
1474 _context: &BootstrapContext,
1475 _event_tx: BootstrapEventSender,
1476 _settings: Option<&crate::config::SourceSubscriptionSettings>,
1477 ) -> Result<BootstrapResult> {
1478 Ok(BootstrapResult::default())
1479 }
1480 }
1481
1482 fn make_base_with_bootstrap(id: &str) -> SourceBase {
1483 let mut params = SourceBaseParams::new(id);
1484 params.bootstrap_provider = Some(Box::new(NoopProvider));
1485 SourceBase::new(params).unwrap()
1486 }
1487
1488 #[tokio::test]
1489 async fn test_create_position_handle_initializes_to_u64_max() {
1490 let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
1491 let handle = base.create_position_handle("q1").await;
1492 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1493 }
1494
1495 #[tokio::test]
1496 async fn test_create_position_handle_idempotent_for_same_query() {
1497 let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
1498 let h1 = base.create_position_handle("q1").await;
1499 h1.store(123, Ordering::Relaxed);
1500 let h2 = base.create_position_handle("q1").await;
1501 assert!(Arc::ptr_eq(&h1, &h2));
1503 assert_eq!(h2.load(Ordering::Relaxed), 123);
1504 }
1505
1506 #[tokio::test]
1507 async fn test_remove_position_handle_drops_entry() {
1508 let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
1509 let handle = base.create_position_handle("q1").await;
1510 handle.store(42, Ordering::Relaxed);
1511 assert_eq!(base.compute_confirmed_position().await, Some(42));
1512 base.remove_position_handle("q1").await;
1513 assert_eq!(base.compute_confirmed_position().await, None);
1514 }
1515
1516 #[tokio::test]
1517 async fn test_remove_position_handle_noop_when_absent() {
1518 let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
1519 base.remove_position_handle("never-registered").await;
1521 assert_eq!(base.compute_confirmed_position().await, None);
1522 }
1523
1524 #[tokio::test]
1525 async fn test_compute_confirmed_position_returns_none_when_empty() {
1526 let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1527 assert_eq!(base.compute_confirmed_position().await, None);
1528 }
1529
1530 #[tokio::test]
1531 async fn test_compute_confirmed_position_returns_none_when_all_max() {
1532 let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1533 let _h1 = base.create_position_handle("q1").await;
1534 let _h2 = base.create_position_handle("q2").await;
1535 assert_eq!(base.compute_confirmed_position().await, None);
1536 }
1537
1538 #[tokio::test]
1539 async fn test_compute_confirmed_position_filters_max_returns_min() {
1540 let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1541 let h1 = base.create_position_handle("q1").await;
1542 let _h2 = base.create_position_handle("q2").await; let h3 = base.create_position_handle("q3").await;
1544 h1.store(100, Ordering::Relaxed);
1545 h3.store(50, Ordering::Relaxed);
1546 assert_eq!(base.compute_confirmed_position().await, Some(50));
1547 }
1548
1549 #[tokio::test]
1550 async fn test_compute_confirmed_position_single_real_value() {
1551 let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1552 let h1 = base.create_position_handle("q1").await;
1553 let _h2 = base.create_position_handle("q2").await;
1554 h1.store(7, Ordering::Relaxed);
1555 assert_eq!(base.compute_confirmed_position().await, Some(7));
1556 }
1557
1558 #[tokio::test]
1559 async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1560 let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1561 {
1562 let handle = base.create_position_handle("q1").await;
1563 handle.store(99, Ordering::Relaxed);
1564 }
1566 base.cleanup_stale_handles().await;
1567 assert_eq!(base.compute_confirmed_position().await, None);
1568 }
1569
1570 #[tokio::test]
1571 async fn test_cleanup_stale_handles_keeps_held_arc() {
1572 let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1573 let handle = base.create_position_handle("q1").await;
1574 handle.store(11, Ordering::Relaxed);
1575 base.cleanup_stale_handles().await;
1576 assert_eq!(base.compute_confirmed_position().await, Some(11));
1578 drop(handle);
1580 }
1581
1582 #[tokio::test]
1583 async fn test_subscribe_with_request_position_handle_returns_handle() {
1584 let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1585 let response = base
1586 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1587 .await
1588 .unwrap();
1589 let handle = response.position_handle.expect("expected handle");
1590 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1591 assert_eq!(base.compute_confirmed_position().await, None); }
1594
1595 #[tokio::test]
1596 async fn test_subscribe_without_request_position_handle_returns_none() {
1597 let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1598 let response = base
1599 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1600 .await
1601 .unwrap();
1602 assert!(response.position_handle.is_none());
1603 let handles = base.position_handles.read().await;
1606 assert!(handles.is_empty());
1607 }
1608
1609 #[tokio::test]
1610 async fn test_subscribe_returned_handle_shared_with_internal() {
1611 let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1612 let response = base
1613 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1614 .await
1615 .unwrap();
1616 let handle = response.position_handle.unwrap();
1617 handle.store(42, Ordering::Relaxed);
1618 assert_eq!(base.compute_confirmed_position().await, Some(42));
1619 }
1620
1621 #[tokio::test]
1622 async fn test_subscribe_with_resume_from_skips_bootstrap() {
1623 let base = make_base_with_bootstrap("sub-resume");
1624 let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1625 let response = base
1626 .subscribe_with_bootstrap(&make_settings("q1", true, Some(position), false), "test")
1627 .await
1628 .unwrap();
1629 assert!(
1630 response.bootstrap_receiver.is_none(),
1631 "resume_from must override enable_bootstrap"
1632 );
1633 }
1634
1635 #[tokio::test]
1636 async fn test_subscribe_resume_without_bootstrap_still_none() {
1637 let base = make_base_with_bootstrap("sub-resume-no-bs");
1638 let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1639 let response = base
1640 .subscribe_with_bootstrap(&make_settings("q1", false, Some(position), false), "test")
1641 .await
1642 .unwrap();
1643 assert!(response.bootstrap_receiver.is_none());
1644 }
1645
1646 #[tokio::test]
1647 async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1648 let base = make_base_with_bootstrap("sub-bs");
1649 let response = base
1650 .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1651 .await
1652 .unwrap();
1653 assert!(
1654 response.bootstrap_receiver.is_some(),
1655 "regression guard: bootstrap path must still produce a receiver"
1656 );
1657 }
1658
1659 #[tokio::test]
1660 async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1661 let base = make_base_with_bootstrap("sub-neither");
1662 let response = base
1663 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1664 .await
1665 .unwrap();
1666 assert!(response.bootstrap_receiver.is_none());
1667 assert!(response.position_handle.is_none());
1668 }
1669
1670 fn make_event(source_id: &str, position: Option<&[u8]>) -> SourceEventWrapper {
1675 let change = drasi_core::models::SourceChange::Insert {
1676 element: drasi_core::models::Element::Node {
1677 metadata: drasi_core::models::ElementMetadata {
1678 reference: drasi_core::models::ElementReference::new(source_id, "n1"),
1679 labels: Arc::from([Arc::from("Label")]),
1680 effective_from: 0,
1681 },
1682 properties: drasi_core::models::ElementPropertyMap::new(),
1683 },
1684 };
1685 let mut wrapper = SourceEventWrapper::new(
1686 source_id.to_string(),
1687 SourceEvent::Change(change),
1688 chrono::Utc::now(),
1689 );
1690 wrapper.source_position = position.map(|p| bytes::Bytes::from(p.to_vec()));
1691 wrapper
1692 }
1693
1694 #[tokio::test]
1695 async fn test_dispatch_events_batch_empty_returns_ok() {
1696 let base = SourceBase::new(SourceBaseParams::new("batch-empty")).unwrap();
1697 let result = base.dispatch_events_batch(Vec::new()).await;
1698 assert!(result.is_ok());
1699 }
1700
1701 #[tokio::test]
1702 async fn test_dispatch_events_batch_stamps_monotonic_sequences() {
1703 let params = SourceBaseParams::new("batch-seq").with_dispatch_mode(DispatchMode::Channel);
1704 let base = SourceBase::new(params).unwrap();
1705
1706 let mut receiver = base.create_streaming_receiver().await.unwrap();
1708
1709 let events = vec![
1710 make_event("batch-seq", Some(b"\x01")),
1711 make_event("batch-seq", Some(b"\x02")),
1712 make_event("batch-seq", Some(b"\x03")),
1713 ];
1714
1715 base.dispatch_events_batch(events).await.unwrap();
1716
1717 let e1 = receiver.recv().await.unwrap();
1718 let e2 = receiver.recv().await.unwrap();
1719 let e3 = receiver.recv().await.unwrap();
1720
1721 let s1 = e1.sequence.expect("event 1 must have sequence");
1722 let s2 = e2.sequence.expect("event 2 must have sequence");
1723 let s3 = e3.sequence.expect("event 3 must have sequence");
1724
1725 assert_eq!(s2, s1 + 1, "sequences must be monotonically increasing");
1726 assert_eq!(s3, s2 + 1, "sequences must be monotonically increasing");
1727 }
1728
1729 #[tokio::test]
1730 async fn test_dispatch_events_batch_multi_dispatcher_fanout() {
1731 let params =
1732 SourceBaseParams::new("batch-fanout").with_dispatch_mode(DispatchMode::Channel);
1733 let base = SourceBase::new(params).unwrap();
1734
1735 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1737 let mut rx2 = base.create_streaming_receiver().await.unwrap();
1738
1739 let events = vec![
1740 make_event("batch-fanout", Some(b"\x01")),
1741 make_event("batch-fanout", Some(b"\x02")),
1742 ];
1743
1744 base.dispatch_events_batch(events).await.unwrap();
1745
1746 let r1_e1 = rx1.recv().await.unwrap();
1748 let r1_e2 = rx1.recv().await.unwrap();
1749 let r2_e1 = rx2.recv().await.unwrap();
1750 let r2_e2 = rx2.recv().await.unwrap();
1751
1752 assert_eq!(r1_e1.sequence, r2_e1.sequence);
1754 assert_eq!(r1_e2.sequence, r2_e2.sequence);
1755 }
1756
1757 #[tokio::test]
1758 async fn test_dispatch_events_batch_oversized_position_still_dispatches() {
1759 let params =
1760 SourceBaseParams::new("batch-oversize").with_dispatch_mode(DispatchMode::Channel);
1761 let base = SourceBase::new(params).unwrap();
1762 let mut rx = base.create_streaming_receiver().await.unwrap();
1763
1764 let big_pos = vec![0xAA; SourceBase::MAX_SOURCE_POSITION_BYTES + 1];
1766 let events = vec![make_event("batch-oversize", Some(&big_pos))];
1767
1768 base.dispatch_events_batch(events).await.unwrap();
1770
1771 let received = rx.recv().await.unwrap();
1772 assert!(received.sequence.is_some(), "event must still be stamped");
1773 assert_eq!(
1774 received.source_position.as_ref().map(|p| p.len()),
1775 Some(SourceBase::MAX_SOURCE_POSITION_BYTES + 1),
1776 "oversized position must still be delivered (checkpoint layer enforces the limit)"
1777 );
1778 }
1779
1780 #[tokio::test]
1785 async fn test_position_filter_suppresses_events_before_resume() {
1786 let params = SourceBaseParams::new("pos-filter").with_dispatch_mode(DispatchMode::Channel);
1787 let base = SourceBase::new(params).unwrap();
1788 base.set_position_comparator(ByteLexPositionComparator)
1789 .await;
1790
1791 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1793 let mut rx2 = base.create_streaming_receiver().await.unwrap();
1794
1795 base.subscriber_resume_positions
1798 .write()
1799 .await
1800 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1801
1802 let event = make_event("pos-filter", Some(&[0x00, 0x03]));
1804 base.dispatch_event(event).await.unwrap();
1805
1806 let r2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1808 .await
1809 .unwrap()
1810 .unwrap();
1811 assert_eq!(r2.source_position.as_ref().unwrap().as_ref(), &[0x00, 0x03]);
1812
1813 let r1 = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
1815 assert!(r1.is_err(), "rx1 should timeout — event was suppressed");
1816 }
1817
1818 #[tokio::test]
1819 async fn test_position_filter_delivers_events_past_resume() {
1820 let params = SourceBaseParams::new("pos-filter2").with_dispatch_mode(DispatchMode::Channel);
1821 let base = SourceBase::new(params).unwrap();
1822 base.set_position_comparator(ByteLexPositionComparator)
1823 .await;
1824
1825 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1826
1827 base.subscriber_resume_positions
1829 .write()
1830 .await
1831 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1832
1833 let event = make_event("pos-filter2", Some(&[0x00, 0x06]));
1835 base.dispatch_event(event).await.unwrap();
1836
1837 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1838 .await
1839 .unwrap()
1840 .unwrap();
1841 assert_eq!(
1842 received.source_position.as_ref().unwrap().as_ref(),
1843 &[0x00, 0x06]
1844 );
1845 }
1846
1847 #[tokio::test]
1848 async fn test_position_filter_advances_high_water_mark() {
1849 let params = SourceBaseParams::new("pos-hwm").with_dispatch_mode(DispatchMode::Channel);
1850 let base = SourceBase::new(params).unwrap();
1851 base.set_position_comparator(ByteLexPositionComparator)
1852 .await;
1853
1854 let mut rx = base.create_streaming_receiver().await.unwrap();
1855
1856 base.subscriber_resume_positions
1858 .write()
1859 .await
1860 .insert(0, Bytes::from_static(&[0x00, 0x03]));
1861
1862 base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x04])))
1864 .await
1865 .unwrap();
1866 let _ = rx.recv().await.unwrap();
1867
1868 {
1870 let positions = base.subscriber_resume_positions.read().await;
1871 assert_eq!(
1872 positions.get(&0).map(|b| b.as_ref()),
1873 Some([0x00, 0x04].as_slice()),
1874 "high-water mark should be advanced to dispatched position"
1875 );
1876 }
1877
1878 base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x01])))
1880 .await
1881 .unwrap();
1882 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1883 assert!(
1884 r.is_err(),
1885 "event below high-water mark should be suppressed after rewind"
1886 );
1887
1888 base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x06])))
1890 .await
1891 .unwrap();
1892 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1893 .await
1894 .unwrap()
1895 .unwrap();
1896 assert_eq!(
1897 received.source_position.as_ref().unwrap().as_ref(),
1898 &[0x00, 0x06]
1899 );
1900 }
1901
1902 #[tokio::test]
1903 async fn test_position_filter_equal_position_is_suppressed() {
1904 let params = SourceBaseParams::new("pos-equal").with_dispatch_mode(DispatchMode::Channel);
1907 let base = SourceBase::new(params).unwrap();
1908 base.set_position_comparator(ByteLexPositionComparator)
1909 .await;
1910
1911 let mut rx = base.create_streaming_receiver().await.unwrap();
1912
1913 base.subscriber_resume_positions
1914 .write()
1915 .await
1916 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1917
1918 base.dispatch_event(make_event("pos-equal", Some(&[0x00, 0x05])))
1920 .await
1921 .unwrap();
1922
1923 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1924 assert!(
1925 r.is_err(),
1926 "event at exactly resume position should be suppressed"
1927 );
1928 }
1929
1930 #[tokio::test]
1931 async fn test_position_filter_no_comparator_delivers_all() {
1932 let params = SourceBaseParams::new("no-cmp").with_dispatch_mode(DispatchMode::Channel);
1935 let base = SourceBase::new(params).unwrap();
1936 let mut rx = base.create_streaming_receiver().await.unwrap();
1939
1940 base.subscriber_resume_positions
1941 .write()
1942 .await
1943 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1944
1945 base.dispatch_event(make_event("no-cmp", Some(&[0x00, 0x03])))
1947 .await
1948 .unwrap();
1949
1950 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1951 .await
1952 .unwrap()
1953 .unwrap();
1954 assert_eq!(
1955 received.source_position.as_ref().unwrap().as_ref(),
1956 &[0x00, 0x03]
1957 );
1958 }
1959
1960 #[tokio::test]
1961 async fn test_position_filter_batch_mode() {
1962 let params = SourceBaseParams::new("pos-batch").with_dispatch_mode(DispatchMode::Channel);
1963 let base = SourceBase::new(params).unwrap();
1964 base.set_position_comparator(ByteLexPositionComparator)
1965 .await;
1966
1967 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1968 let mut rx2 = base.create_streaming_receiver().await.unwrap();
1969
1970 {
1973 let mut positions = base.subscriber_resume_positions.write().await;
1974 positions.insert(0, Bytes::from_static(&[0x00, 0x05]));
1975 positions.insert(1, Bytes::from_static(&[0x00, 0x02]));
1976 }
1977
1978 let events = vec![
1979 make_event("pos-batch", Some(&[0x00, 0x01])), make_event("pos-batch", Some(&[0x00, 0x03])), make_event("pos-batch", Some(&[0x00, 0x06])), ];
1983 base.dispatch_events_batch(events).await.unwrap();
1984
1985 let r2_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1987 .await
1988 .unwrap()
1989 .unwrap();
1990 assert_eq!(
1991 r2_1.source_position.as_ref().unwrap().as_ref(),
1992 &[0x00, 0x03]
1993 );
1994
1995 let r2_2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1996 .await
1997 .unwrap()
1998 .unwrap();
1999 assert_eq!(
2000 r2_2.source_position.as_ref().unwrap().as_ref(),
2001 &[0x00, 0x06]
2002 );
2003
2004 let r1_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
2006 .await
2007 .unwrap()
2008 .unwrap();
2009 assert_eq!(
2010 r1_1.source_position.as_ref().unwrap().as_ref(),
2011 &[0x00, 0x06]
2012 );
2013
2014 let r1_extra = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
2016 assert!(r1_extra.is_err(), "rx1 should have no more events");
2017 }
2018
2019 #[tokio::test]
2020 async fn test_position_filter_events_without_position_delivered() {
2021 let params = SourceBaseParams::new("pos-none").with_dispatch_mode(DispatchMode::Channel);
2023 let base = SourceBase::new(params).unwrap();
2024 base.set_position_comparator(ByteLexPositionComparator)
2025 .await;
2026
2027 let mut rx = base.create_streaming_receiver().await.unwrap();
2028
2029 base.subscriber_resume_positions
2030 .write()
2031 .await
2032 .insert(0, Bytes::from_static(&[0x00, 0x05]));
2033
2034 base.dispatch_event(make_event("pos-none", None))
2036 .await
2037 .unwrap();
2038
2039 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
2040 .await
2041 .unwrap()
2042 .unwrap();
2043 assert!(received.source_position.is_none());
2044 }
2045
2046 #[tokio::test]
2051 async fn test_sequence_position_map_populated_on_dispatch() {
2052 let params = SourceBaseParams::new("spm-1").with_dispatch_mode(DispatchMode::Channel);
2053 let base = SourceBase::new(params).unwrap();
2054 let _rx = base.create_streaming_receiver().await.unwrap();
2055
2056 let lsn: u64 = 0x1234;
2057 base.dispatch_event(make_event("spm-1", Some(&lsn.to_be_bytes())))
2058 .await
2059 .unwrap();
2060
2061 let map = base.sequence_position_map.read().await;
2062 assert_eq!(map.len(), 1);
2063 let (seq, pos) = map.iter().next().unwrap();
2064 assert_eq!(*seq, 1); assert_eq!(pos.as_ref(), &lsn.to_be_bytes());
2066 }
2067
2068 #[tokio::test]
2069 async fn test_sequence_position_map_not_populated_without_position() {
2070 let params = SourceBaseParams::new("spm-none").with_dispatch_mode(DispatchMode::Channel);
2071 let base = SourceBase::new(params).unwrap();
2072 let _rx = base.create_streaming_receiver().await.unwrap();
2073
2074 base.dispatch_event(make_event("spm-none", None))
2075 .await
2076 .unwrap();
2077
2078 let map = base.sequence_position_map.read().await;
2079 assert!(map.is_empty());
2080 }
2081
2082 #[tokio::test]
2083 async fn test_compute_confirmed_source_position_basic() {
2084 let params = SourceBaseParams::new("cssp-1").with_dispatch_mode(DispatchMode::Channel);
2085 let base = SourceBase::new(params).unwrap();
2086 let _rx = base.create_streaming_receiver().await.unwrap();
2087
2088 let handle = base.create_position_handle("q1").await;
2090
2091 for lsn in [100u64, 200, 300] {
2093 base.dispatch_event(make_event("cssp-1", Some(&lsn.to_be_bytes())))
2094 .await
2095 .unwrap();
2096 }
2097
2098 handle.store(2, Ordering::Relaxed);
2100
2101 let confirmed = base.compute_confirmed_source_position().await;
2102 assert!(confirmed.is_some());
2103 let lsn_bytes = confirmed.unwrap();
2104 assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 200);
2105 }
2106
2107 #[tokio::test]
2108 async fn test_compute_confirmed_source_position_returns_none_when_no_handles() {
2109 let base = SourceBase::new(SourceBaseParams::new("cssp-none")).unwrap();
2110 assert!(base.compute_confirmed_source_position().await.is_none());
2111 }
2112
2113 #[tokio::test]
2114 async fn test_compute_confirmed_source_position_returns_none_when_all_max() {
2115 let base = SourceBase::new(SourceBaseParams::new("cssp-max")).unwrap();
2116 let _h = base.create_position_handle("q1").await;
2117 assert!(base.compute_confirmed_source_position().await.is_none());
2119 }
2120
2121 #[tokio::test]
2122 async fn test_compute_confirmed_source_position_min_of_two_queries() {
2123 let params = SourceBaseParams::new("cssp-2q").with_dispatch_mode(DispatchMode::Channel);
2124 let base = SourceBase::new(params).unwrap();
2125 let _rx = base.create_streaming_receiver().await.unwrap();
2126
2127 let h1 = base.create_position_handle("q1").await;
2128 let h2 = base.create_position_handle("q2").await;
2129
2130 for lsn in [100u64, 200, 300] {
2132 base.dispatch_event(make_event("cssp-2q", Some(&lsn.to_be_bytes())))
2133 .await
2134 .unwrap();
2135 }
2136
2137 h1.store(3, Ordering::Relaxed);
2139 h2.store(1, Ordering::Relaxed);
2140
2141 let confirmed = base.compute_confirmed_source_position().await;
2142 assert!(confirmed.is_some());
2143 let lsn_bytes = confirmed.unwrap();
2144 assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 100);
2146 }
2147
2148 #[tokio::test]
2149 async fn test_prune_position_map() {
2150 let params = SourceBaseParams::new("prune-1").with_dispatch_mode(DispatchMode::Channel);
2151 let base = SourceBase::new(params).unwrap();
2152 let _rx = base.create_streaming_receiver().await.unwrap();
2153
2154 for lsn in [10u64, 20, 30, 40, 50] {
2155 base.dispatch_event(make_event("prune-1", Some(&lsn.to_be_bytes())))
2156 .await
2157 .unwrap();
2158 }
2159 assert_eq!(base.sequence_position_map.read().await.len(), 5);
2161
2162 base.prune_position_map(3).await;
2163 let map = base.sequence_position_map.read().await;
2164 assert_eq!(map.len(), 2); assert!(map.contains_key(&4));
2166 assert!(map.contains_key(&5));
2167 }
2168
2169 #[tokio::test]
2170 async fn test_prune_position_map_all() {
2171 let params = SourceBaseParams::new("prune-all").with_dispatch_mode(DispatchMode::Channel);
2172 let base = SourceBase::new(params).unwrap();
2173 let _rx = base.create_streaming_receiver().await.unwrap();
2174
2175 for lsn in [10u64, 20] {
2176 base.dispatch_event(make_event("prune-all", Some(&lsn.to_be_bytes())))
2177 .await
2178 .unwrap();
2179 }
2180 base.prune_position_map(100).await; assert!(base.sequence_position_map.read().await.is_empty());
2182 }
2183
2184 #[tokio::test]
2185 async fn test_position_handle_initialized_to_last_sequence() {
2186 use crate::config::SourceSubscriptionSettings;
2187
2188 let params = SourceBaseParams::new("ph-init").with_dispatch_mode(DispatchMode::Channel);
2189 let base = SourceBase::new(params).unwrap();
2190
2191 let settings = SourceSubscriptionSettings {
2192 query_id: "q1".to_string(),
2193 source_id: "ph-init".to_string(),
2194 enable_bootstrap: false,
2195 resume_from: Some(Bytes::from_static(&[0x00, 0x01])),
2196 last_sequence: Some(42),
2197 request_position_handle: true,
2198 nodes: Default::default(),
2199 relations: Default::default(),
2200 };
2201
2202 let response = base
2203 .subscribe_with_bootstrap(&settings, "test")
2204 .await
2205 .unwrap();
2206 let handle = response.position_handle.expect("should have handle");
2207 assert_eq!(handle.load(Ordering::Relaxed), 42);
2209 }
2210
2211 #[tokio::test]
2212 async fn test_position_handle_stays_max_without_last_sequence() {
2213 use crate::config::SourceSubscriptionSettings;
2214
2215 let params = SourceBaseParams::new("ph-no-ls").with_dispatch_mode(DispatchMode::Channel);
2216 let base = SourceBase::new(params).unwrap();
2217
2218 let settings = SourceSubscriptionSettings {
2219 query_id: "q1".to_string(),
2220 source_id: "ph-no-ls".to_string(),
2221 enable_bootstrap: true,
2222 resume_from: None,
2223 last_sequence: None,
2224 request_position_handle: true,
2225 nodes: Default::default(),
2226 relations: Default::default(),
2227 };
2228
2229 let response = base
2230 .subscribe_with_bootstrap(&settings, "test")
2231 .await
2232 .unwrap();
2233 let handle = response.position_handle.expect("should have handle");
2234 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
2236 }
2237
2238 #[tokio::test]
2239 async fn test_batch_dispatch_populates_sequence_position_map() {
2240 let params = SourceBaseParams::new("spm-batch").with_dispatch_mode(DispatchMode::Channel);
2241 let base = SourceBase::new(params).unwrap();
2242 let _rx = base.create_streaming_receiver().await.unwrap();
2243
2244 let events = vec![
2245 make_event("spm-batch", Some(&100u64.to_be_bytes())),
2246 make_event("spm-batch", Some(&200u64.to_be_bytes())),
2247 make_event("spm-batch", None), ];
2249
2250 base.dispatch_events_batch(events).await.unwrap();
2251
2252 let map = base.sequence_position_map.read().await;
2253 assert_eq!(map.len(), 2);
2255 assert!(map.contains_key(&1));
2256 assert!(map.contains_key(&2));
2257 }
2258
2259 #[tokio::test]
2260 async fn test_position_filter_rewind_protection_multi_subscriber() {
2261 let params = SourceBaseParams::new("rewind").with_dispatch_mode(DispatchMode::Channel);
2266 let base = SourceBase::new(params).unwrap();
2267 base.set_position_comparator(ByteLexPositionComparator)
2268 .await;
2269
2270 let mut rx_a = base.create_streaming_receiver().await.unwrap();
2272 base.subscriber_resume_positions
2273 .write()
2274 .await
2275 .insert(0, Bytes::from_static(&[0x10]));
2276
2277 base.dispatch_event(make_event("rewind", Some(&[0x20])))
2279 .await
2280 .unwrap();
2281 let ev = rx_a.recv().await.unwrap();
2282 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2283
2284 base.dispatch_event(make_event("rewind", Some(&[0x30])))
2285 .await
2286 .unwrap();
2287 let ev = rx_a.recv().await.unwrap();
2288 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2289
2290 let mut rx_b = base.create_streaming_receiver().await.unwrap();
2292 base.subscriber_resume_positions
2293 .write()
2294 .await
2295 .insert(1, Bytes::from_static(&[0x10]));
2296
2297 base.dispatch_event(make_event("rewind", Some(&[0x20])))
2301 .await
2302 .unwrap();
2303
2304 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2306 assert!(
2307 r.is_err(),
2308 "subscriber A should not see replayed event at [0x20]"
2309 );
2310
2311 let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2313 .await
2314 .unwrap()
2315 .unwrap();
2316 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2317
2318 base.dispatch_event(make_event("rewind", Some(&[0x30])))
2320 .await
2321 .unwrap();
2322
2323 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2324 assert!(
2325 r.is_err(),
2326 "subscriber A should not see replayed event at [0x30]"
2327 );
2328
2329 let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2330 .await
2331 .unwrap()
2332 .unwrap();
2333 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2334
2335 base.dispatch_event(make_event("rewind", Some(&[0x40])))
2337 .await
2338 .unwrap();
2339
2340 let ev_a = tokio::time::timeout(std::time::Duration::from_millis(100), rx_a.recv())
2341 .await
2342 .unwrap()
2343 .unwrap();
2344 assert_eq!(ev_a.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2345
2346 let ev_b = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2347 .await
2348 .unwrap()
2349 .unwrap();
2350 assert_eq!(ev_b.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2351 }
2352
2353 #[tokio::test]
2354 async fn test_high_water_mark_set_for_new_subscriber_without_resume() {
2355 let params = SourceBaseParams::new("hwm-new").with_dispatch_mode(DispatchMode::Channel);
2358 let base = SourceBase::new(params).unwrap();
2359 base.set_position_comparator(ByteLexPositionComparator)
2360 .await;
2361
2362 let mut rx = base.create_streaming_receiver().await.unwrap();
2363 base.dispatch_event(make_event("hwm-new", Some(&[0x10])))
2367 .await
2368 .unwrap();
2369 let _ = rx.recv().await.unwrap();
2370
2371 {
2373 let positions = base.subscriber_resume_positions.read().await;
2374 assert_eq!(
2375 positions.get(&0).map(|b| b.as_ref()),
2376 Some([0x10].as_slice()),
2377 "high-water mark should be set after first dispatch"
2378 );
2379 }
2380
2381 base.dispatch_event(make_event("hwm-new", Some(&[0x05])))
2383 .await
2384 .unwrap();
2385 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
2386 assert!(
2387 r.is_err(),
2388 "event below high-water mark should be suppressed"
2389 );
2390 }
2391}