1use anyhow::Result;
33use log::{debug, error, info, warn};
34use std::collections::{BTreeMap, HashMap};
35use std::sync::atomic::{AtomicU64, Ordering};
36use std::sync::Arc;
37use tokio::sync::{Notify, RwLock};
38use tracing::Instrument;
39
40use crate::bootstrap::{BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult};
41use crate::channels::*;
42use crate::component_graph::ComponentStatusHandle;
43use crate::context::SourceRuntimeContext;
44use crate::identity::IdentityProvider;
45use crate::profiling;
46use crate::sources::PositionComparator;
47use crate::state_store::StateStoreProvider;
48use bytes::Bytes;
49use drasi_core::models::SourceChange;
50
51pub struct SourceBaseParams {
69 pub id: String,
71 pub dispatch_mode: Option<DispatchMode>,
73 pub dispatch_buffer_capacity: Option<usize>,
75 pub bootstrap_provider: Option<Box<dyn BootstrapProvider + 'static>>,
77 pub state_store: Option<Arc<dyn StateStoreProvider>>,
79 pub auto_start: bool,
81}
82
83impl std::fmt::Debug for SourceBaseParams {
84 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
85 f.debug_struct("SourceBaseParams")
86 .field("id", &self.id)
87 .field("dispatch_mode", &self.dispatch_mode)
88 .field("dispatch_buffer_capacity", &self.dispatch_buffer_capacity)
89 .field(
90 "bootstrap_provider",
91 &self.bootstrap_provider.as_ref().map(|_| "<provider>"),
92 )
93 .field(
94 "state_store",
95 &self.state_store.as_ref().map(|_| "<StateStoreProvider>"),
96 )
97 .field("auto_start", &self.auto_start)
98 .finish()
99 }
100}
101
102impl SourceBaseParams {
103 pub fn new(id: impl Into<String>) -> Self {
105 Self {
106 id: id.into(),
107 dispatch_mode: None,
108 dispatch_buffer_capacity: None,
109 bootstrap_provider: None,
110 state_store: None,
111 auto_start: true,
112 }
113 }
114
115 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
117 self.dispatch_mode = Some(mode);
118 self
119 }
120
121 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
123 self.dispatch_buffer_capacity = Some(capacity);
124 self
125 }
126
127 pub fn with_bootstrap_provider(mut self, provider: impl BootstrapProvider + 'static) -> Self {
132 self.bootstrap_provider = Some(Box::new(provider));
133 self
134 }
135
136 pub fn with_state_store(mut self, store: Arc<dyn StateStoreProvider>) -> Self {
138 self.state_store = Some(store);
139 self
140 }
141
142 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
147 self.auto_start = auto_start;
148 self
149 }
150}
151
152pub struct SourceBase {
154 pub id: String,
156 dispatch_mode: DispatchMode,
158 dispatch_buffer_capacity: usize,
160 pub auto_start: bool,
162 status_handle: ComponentStatusHandle,
164 pub dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
170 context: Arc<RwLock<Option<SourceRuntimeContext>>>,
172 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
174 pub task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
176 pub shutdown_tx: Arc<RwLock<Option<tokio::sync::oneshot::Sender<()>>>>,
178 bootstrap_provider: Arc<RwLock<Option<Arc<dyn BootstrapProvider>>>>,
180 identity_provider: Arc<RwLock<Option<Arc<dyn IdentityProvider>>>>,
184 position_handles: Arc<RwLock<HashMap<String, Arc<AtomicU64>>>>,
193 next_sequence: Arc<AtomicU64>,
196 raw_config: Option<serde_json::Value>,
199 subscriber_notify: Arc<Notify>,
203 subscriber_resume_positions: Arc<RwLock<HashMap<usize, Bytes>>>,
212 position_comparator: Arc<RwLock<Option<Arc<dyn PositionComparator>>>>,
217 sequence_position_map: Arc<RwLock<BTreeMap<u64, Bytes>>>,
229}
230
231impl SourceBase {
232 pub fn new(params: SourceBaseParams) -> Result<Self> {
240 let dispatch_mode = params.dispatch_mode.unwrap_or_default();
242 let dispatch_buffer_capacity = params.dispatch_buffer_capacity.unwrap_or(1000);
243
244 let mut dispatchers: Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>> =
246 Vec::new();
247
248 if dispatch_mode == DispatchMode::Broadcast {
249 let dispatcher =
251 BroadcastChangeDispatcher::<SourceEventWrapper>::new(dispatch_buffer_capacity);
252 dispatchers.push(Box::new(dispatcher));
253 }
254 let bootstrap_provider = params
258 .bootstrap_provider
259 .map(|p| Arc::from(p) as Arc<dyn BootstrapProvider>);
260
261 Ok(Self {
262 id: params.id.clone(),
263 dispatch_mode,
264 dispatch_buffer_capacity,
265 auto_start: params.auto_start,
266 status_handle: ComponentStatusHandle::new(¶ms.id),
267 dispatchers: Arc::new(RwLock::new(dispatchers)),
268 context: Arc::new(RwLock::new(None)), state_store: Arc::new(RwLock::new(params.state_store)), task_handle: Arc::new(RwLock::new(None)),
271 shutdown_tx: Arc::new(RwLock::new(None)),
272 bootstrap_provider: Arc::new(RwLock::new(bootstrap_provider)),
273 identity_provider: Arc::new(RwLock::new(None)),
274 position_handles: Arc::new(RwLock::new(HashMap::new())),
275 next_sequence: Arc::new(AtomicU64::new(1)),
276 raw_config: None,
277 subscriber_notify: Arc::new(Notify::new()),
278 subscriber_resume_positions: Arc::new(RwLock::new(HashMap::new())),
279 position_comparator: Arc::new(RwLock::new(None)),
280 sequence_position_map: Arc::new(RwLock::new(BTreeMap::new())),
281 })
282 }
283
284 pub fn get_auto_start(&self) -> bool {
286 self.auto_start
287 }
288
289 pub fn set_raw_config(&mut self, config: serde_json::Value) {
295 self.raw_config = Some(config);
296 }
297
298 pub fn raw_config(&self) -> Option<&serde_json::Value> {
303 self.raw_config.as_ref()
304 }
305
306 pub fn properties_or_serialize<D: serde::Serialize>(
314 &self,
315 fallback_dto: &D,
316 ) -> HashMap<String, serde_json::Value> {
317 if let Some(serde_json::Value::Object(map)) = self.raw_config.as_ref() {
318 return map.iter().map(|(k, v)| (k.clone(), v.clone())).collect();
319 }
320
321 match serde_json::to_value(fallback_dto) {
322 Ok(serde_json::Value::Object(map)) => map.into_iter().collect(),
323 _ => HashMap::new(),
324 }
325 }
326
327 pub async fn initialize(&self, context: SourceRuntimeContext) {
337 *self.context.write().await = Some(context.clone());
339
340 self.status_handle.wire(context.update_tx.clone()).await;
342
343 if let Some(state_store) = context.state_store.as_ref() {
344 *self.state_store.write().await = Some(state_store.clone());
345 }
346
347 if let Some(ip) = context.identity_provider.as_ref() {
349 let mut guard = self.identity_provider.write().await;
350 if guard.is_none() {
351 *guard = Some(ip.clone());
352 }
353 }
354 }
355
356 pub async fn context(&self) -> Option<SourceRuntimeContext> {
360 self.context.read().await.clone()
361 }
362
363 pub async fn state_store(&self) -> Option<Arc<dyn StateStoreProvider>> {
367 self.state_store.read().await.clone()
368 }
369
370 pub async fn identity_provider(&self) -> Option<Arc<dyn IdentityProvider>> {
376 self.identity_provider.read().await.clone()
377 }
378
379 pub async fn set_identity_provider(&self, provider: Arc<dyn IdentityProvider>) {
385 *self.identity_provider.write().await = Some(provider);
386 }
387
388 pub async fn create_position_handle(&self, query_id: &str) -> Arc<AtomicU64> {
396 let mut handles = self.position_handles.write().await;
397 if let Some(existing) = handles.get(query_id) {
398 return existing.clone();
399 }
400 let handle = Arc::new(AtomicU64::new(u64::MAX));
401 handles.insert(query_id.to_string(), handle.clone());
402 handle
403 }
404
405 pub async fn remove_position_handle(&self, query_id: &str) {
411 let mut handles = self.position_handles.write().await;
412 handles.remove(query_id);
413 }
414
415 pub async fn compute_confirmed_position(&self) -> Option<u64> {
427 self.cleanup_stale_handles().await;
428 let handles = self.position_handles.read().await;
429 let mut min: Option<u64> = None;
430 for handle in handles.values() {
431 let v = handle.load(Ordering::Relaxed);
432 if v == u64::MAX {
433 continue;
434 }
435 min = Some(min.map_or(v, |m| m.min(v)));
436 }
437 min
438 }
439
440 pub async fn cleanup_stale_handles(&self) {
452 let mut handles = self.position_handles.write().await;
453 handles.retain(|_, handle| Arc::strong_count(handle) > 1);
454 }
455
456 pub async fn compute_confirmed_source_position(&self) -> Option<Bytes> {
467 let confirmed_seq = self.compute_confirmed_position().await?;
468 let map = self.sequence_position_map.read().await;
469 map.range(..=confirmed_seq)
471 .next_back()
472 .map(|(_, pos)| pos.clone())
473 }
474
475 pub async fn prune_position_map(&self, up_to_seq: u64) {
481 let mut map = self.sequence_position_map.write().await;
482 let keep = map.split_off(&(up_to_seq.saturating_add(1)));
484 *map = keep;
485 }
486
487 pub fn set_next_sequence(&self, sequence: u64) {
490 self.next_sequence
491 .store(sequence.saturating_add(1), Ordering::Relaxed);
492 }
493
494 pub fn apply_subscription_settings(
501 &self,
502 settings: &crate::config::SourceSubscriptionSettings,
503 ) {
504 if let Some(last_seq) = settings.last_sequence {
505 let next = last_seq.saturating_add(1);
508 let prev = self.next_sequence.fetch_max(next, Ordering::Relaxed);
509 if next > prev {
510 info!(
511 "[{}] Sequence counter recovered to {} (from checkpoint last_sequence={})",
512 self.id, next, last_seq
513 );
514 }
515 }
516 }
517
518 pub fn status_handle(&self) -> ComponentStatusHandle {
523 self.status_handle.clone()
524 }
525
526 pub fn clone_shared(&self) -> Self {
531 Self {
532 id: self.id.clone(),
533 dispatch_mode: self.dispatch_mode,
534 dispatch_buffer_capacity: self.dispatch_buffer_capacity,
535 auto_start: self.auto_start,
536 status_handle: self.status_handle.clone(),
537 dispatchers: self.dispatchers.clone(),
538 context: self.context.clone(),
539 state_store: self.state_store.clone(),
540 task_handle: self.task_handle.clone(),
541 shutdown_tx: self.shutdown_tx.clone(),
542 bootstrap_provider: self.bootstrap_provider.clone(),
543 identity_provider: self.identity_provider.clone(),
544 position_handles: self.position_handles.clone(),
545 next_sequence: self.next_sequence.clone(),
546 raw_config: self.raw_config.clone(),
547 subscriber_notify: self.subscriber_notify.clone(),
548 subscriber_resume_positions: self.subscriber_resume_positions.clone(),
549 position_comparator: self.position_comparator.clone(),
550 sequence_position_map: self.sequence_position_map.clone(),
551 }
552 }
553
554 pub async fn set_bootstrap_provider(&self, provider: impl BootstrapProvider + 'static) {
565 *self.bootstrap_provider.write().await = Some(Arc::new(provider));
566 }
567
568 pub async fn set_position_comparator(&self, comparator: impl PositionComparator + 'static) {
574 *self.position_comparator.write().await = Some(Arc::new(comparator));
575 }
576
577 pub fn get_id(&self) -> &str {
579 &self.id
580 }
581
582 pub async fn create_streaming_receiver(
590 &self,
591 ) -> Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
592 let receiver: Box<dyn ChangeReceiver<SourceEventWrapper>> = match self.dispatch_mode {
593 DispatchMode::Broadcast => {
594 let dispatchers = self.dispatchers.read().await;
596 if let Some(dispatcher) = dispatchers.first() {
597 dispatcher.create_receiver().await?
598 } else {
599 return Err(anyhow::anyhow!("No broadcast dispatcher available"));
600 }
601 }
602 DispatchMode::Channel => {
603 let dispatcher = ChannelChangeDispatcher::<SourceEventWrapper>::new(
605 self.dispatch_buffer_capacity,
606 );
607 let receiver = dispatcher.create_receiver().await?;
608
609 let mut dispatchers = self.dispatchers.write().await;
611 dispatchers.push(Box::new(dispatcher));
612
613 receiver
614 }
615 };
616
617 self.subscriber_notify.notify_one();
621
622 Ok(receiver)
623 }
624
625 pub async fn wait_for_subscribers(&self) {
636 loop {
637 let dispatchers = self.dispatchers.read().await;
638 if !dispatchers.is_empty() {
639 return;
640 }
641 drop(dispatchers);
642 self.subscriber_notify.notified().await;
643 }
644 }
645
646 pub async fn subscribe_with_bootstrap(
654 &self,
655 settings: &crate::config::SourceSubscriptionSettings,
656 source_type: &str,
657 ) -> Result<SubscriptionResponse> {
658 self.apply_subscription_settings(settings);
660
661 info!(
662 "Query '{}' subscribing to {} source '{}' (bootstrap: {}, resume_from: {:?}, request_handle: {})",
663 settings.query_id,
664 source_type,
665 self.id,
666 settings.enable_bootstrap,
667 settings.resume_from,
668 settings.request_position_handle
669 );
670
671 let receiver = self.create_streaming_receiver().await?;
673
674 if self.dispatch_mode == DispatchMode::Channel {
678 if let Some(ref resume_pos) = settings.resume_from {
679 let dispatchers = self.dispatchers.read().await;
680 let dispatcher_idx = dispatchers.len().saturating_sub(1);
681 drop(dispatchers);
682 self.subscriber_resume_positions
683 .write()
684 .await
685 .insert(dispatcher_idx, resume_pos.clone());
686 debug!(
687 "[{}] Registered resume position filter for subscriber '{}' at dispatcher index {}",
688 self.id, settings.query_id, dispatcher_idx
689 );
690 }
691 }
692
693 let query_id_for_response = settings.query_id.clone();
694
695 let (bootstrap_receiver, bootstrap_result_receiver) = if settings.resume_from.is_some() {
699 info!(
700 "Query '{}' resuming from sequence {:?}; skipping bootstrap on {} source '{}'",
701 settings.query_id, settings.resume_from, source_type, self.id
702 );
703 (None, None)
704 } else if settings.enable_bootstrap {
705 match self
706 .handle_bootstrap_subscription(settings, source_type)
707 .await?
708 {
709 Some((event_rx, result_rx)) => (Some(event_rx), Some(result_rx)),
710 None => (None, None),
711 }
712 } else {
713 (None, None)
714 };
715
716 let position_handle = if settings.request_position_handle {
720 let handle = self.create_position_handle(&settings.query_id).await;
721 if let Some(last_seq) = settings.last_sequence {
727 handle.store(last_seq, Ordering::Release);
728 }
729 Some(handle)
730 } else {
731 None
732 };
733
734 Ok(SubscriptionResponse {
735 query_id: query_id_for_response,
736 source_id: self.id.clone(),
737 receiver,
738 bootstrap_receiver,
739 position_handle,
740 bootstrap_result_receiver,
741 })
742 }
743
744 async fn handle_bootstrap_subscription(
749 &self,
750 settings: &crate::config::SourceSubscriptionSettings,
751 source_type: &str,
752 ) -> Result<
753 Option<(
754 BootstrapEventReceiver,
755 tokio::sync::oneshot::Receiver<anyhow::Result<BootstrapResult>>,
756 )>,
757 > {
758 let provider_guard = self.bootstrap_provider.read().await;
759 if let Some(provider) = provider_guard.clone() {
760 drop(provider_guard); info!(
763 "Creating bootstrap for query '{}' on {} source '{}'",
764 settings.query_id, source_type, self.id
765 );
766
767 let context = BootstrapContext::new_minimal(
769 self.id.clone(), self.id.clone(), );
772
773 let (bootstrap_tx, bootstrap_rx) = tokio::sync::mpsc::channel(1000);
775
776 let (result_tx, result_rx) = tokio::sync::oneshot::channel();
778
779 let node_labels: Vec<String> = settings.nodes.iter().cloned().collect();
781 let relation_labels: Vec<String> = settings.relations.iter().cloned().collect();
782
783 let request = BootstrapRequest {
785 query_id: settings.query_id.clone(),
786 node_labels,
787 relation_labels,
788 request_id: format!("{}-{}", settings.query_id, uuid::Uuid::new_v4()),
789 };
790
791 let settings_clone = settings.clone();
793 let source_id = self.id.clone();
794
795 let instance_id = self
797 .context()
798 .await
799 .map(|c| c.instance_id.clone())
800 .unwrap_or_default();
801
802 let span = tracing::info_span!(
804 "source_bootstrap",
805 instance_id = %instance_id,
806 component_id = %source_id,
807 component_type = "source"
808 );
809 tokio::spawn(
810 async move {
811 let outcome = provider
812 .bootstrap(request, &context, bootstrap_tx, Some(&settings_clone))
813 .await;
814
815 match &outcome {
816 Ok(result) => {
817 info!(
818 "Bootstrap completed successfully for query '{}', sent {} events \
819 (last_sequence={:?}, sequences_aligned={})",
820 settings_clone.query_id,
821 result.event_count,
822 result.last_sequence,
823 result.sequences_aligned
824 );
825 }
826 Err(e) => {
827 error!(
828 "Bootstrap failed for query '{}': {e}",
829 settings_clone.query_id
830 );
831 }
832 }
833
834 let _ = result_tx.send(outcome);
837 }
838 .instrument(span),
839 );
840
841 Ok(Some((bootstrap_rx, result_rx)))
842 } else {
843 info!(
844 "Bootstrap requested for query '{}' but no bootstrap provider configured for {} source '{}'",
845 settings.query_id, source_type, self.id
846 );
847 Ok(None)
848 }
849 }
850
851 pub async fn dispatch_source_change(&self, change: SourceChange) -> Result<()> {
859 let mut profiling = profiling::ProfilingMetadata::new();
861 profiling.source_send_ns = Some(profiling::timestamp_ns());
862
863 let wrapper = SourceEventWrapper::with_profiling(
865 self.id.clone(),
866 SourceEvent::Change(change),
867 chrono::Utc::now(),
868 profiling,
869 );
870
871 self.dispatch_event(wrapper).await
873 }
874
875 pub const MAX_SOURCE_POSITION_BYTES: usize = 65_536;
879
880 pub async fn dispatch_event(&self, mut wrapper: SourceEventWrapper) -> Result<()> {
887 if let Some(ref pos) = wrapper.source_position {
890 if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
891 warn!(
892 "[{}] Source position is large ({} bytes > {} limit); \
893 checkpoint staging will preserve the previous good position",
894 self.id,
895 pos.len(),
896 Self::MAX_SOURCE_POSITION_BYTES
897 );
898 }
899 }
900
901 wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
903
904 if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
906 self.sequence_position_map
907 .write()
908 .await
909 .insert(seq, pos.clone());
910 }
911
912 debug!("[{}] Dispatching event: {:?}", self.id, &wrapper);
913
914 let arc_wrapper = Arc::new(wrapper);
916
917 let dispatchers = self.dispatchers.read().await;
919 let comparator = self.position_comparator.read().await;
920 let mut cleared_indices: Vec<usize> = Vec::new();
921 let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
923
924 for (idx, dispatcher) in dispatchers.iter().enumerate() {
925 if let Some(ref cmp) = *comparator {
932 let resume_positions = self.subscriber_resume_positions.read().await;
933 if let Some(resume_pos) = resume_positions.get(&idx) {
934 if let Some(ref event_pos) = arc_wrapper.source_position {
935 if !cmp.position_reached(event_pos, resume_pos) {
936 continue;
938 }
939 cleared_indices.push(idx);
941 }
942 }
944 }
945
946 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
947 debug!("[{}] Failed to dispatch event: {}", self.id, e);
948 } else if let Some(ref event_pos) = arc_wrapper.source_position {
949 hwm_updates.push((idx, event_pos.clone()));
951 }
952 }
953 drop(comparator);
954 drop(dispatchers);
955
956 if !hwm_updates.is_empty() {
961 let mut resume_positions = self.subscriber_resume_positions.write().await;
962 for (idx, pos) in hwm_updates {
963 resume_positions.insert(idx, pos);
964 }
965 }
966
967 Ok(())
968 }
969
970 pub async fn dispatch_events_batch(&self, events: Vec<SourceEventWrapper>) -> Result<()> {
975 if events.is_empty() {
976 return Ok(());
977 }
978
979 let dispatchers = self.dispatchers.read().await;
980 let comparator = self.position_comparator.read().await;
981
982 for mut wrapper in events {
983 if let Some(ref pos) = wrapper.source_position {
984 if pos.len() > Self::MAX_SOURCE_POSITION_BYTES {
985 warn!(
986 "[{}] Source position is large ({} bytes > {} limit); \
987 checkpoint staging will preserve the previous good position",
988 self.id,
989 pos.len(),
990 Self::MAX_SOURCE_POSITION_BYTES
991 );
992 }
993 }
994
995 wrapper.sequence = Some(self.next_sequence.fetch_add(1, Ordering::Relaxed));
996
997 if let (Some(seq), Some(ref pos)) = (wrapper.sequence, &wrapper.source_position) {
999 self.sequence_position_map
1000 .write()
1001 .await
1002 .insert(seq, pos.clone());
1003 }
1004
1005 debug!("[{}] Dispatching event (batch): {:?}", self.id, &wrapper);
1006
1007 let arc_wrapper = Arc::new(wrapper);
1008 let mut cleared_indices: Vec<usize> = Vec::new();
1009 let mut hwm_updates: Vec<(usize, Bytes)> = Vec::new();
1010
1011 for (idx, dispatcher) in dispatchers.iter().enumerate() {
1012 if let Some(ref cmp) = *comparator {
1014 let resume_positions = self.subscriber_resume_positions.read().await;
1015 if let Some(resume_pos) = resume_positions.get(&idx) {
1016 if let Some(ref event_pos) = arc_wrapper.source_position {
1017 if !cmp.position_reached(event_pos, resume_pos) {
1018 debug!(
1019 "[{}] Position filter: SKIPPING event for dispatcher {} \
1020 (event_pos={:?} <= resume_pos={:?})",
1021 self.id,
1022 idx,
1023 event_pos.as_ref(),
1024 resume_pos.as_ref()
1025 );
1026 continue;
1027 }
1028 debug!(
1029 "[{}] Position filter: PASSING event for dispatcher {} \
1030 (event_pos={:?} > resume_pos={:?})",
1031 self.id,
1032 idx,
1033 event_pos.as_ref(),
1034 resume_pos.as_ref()
1035 );
1036 cleared_indices.push(idx);
1037 }
1038 } else {
1039 debug!(
1040 "[{}] Position filter: NO resume position for dispatcher {}, passing through",
1041 self.id, idx
1042 );
1043 }
1044 }
1045
1046 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1047 debug!("[{}] Failed to dispatch event: {}", self.id, e);
1048 } else if let Some(ref event_pos) = arc_wrapper.source_position {
1049 hwm_updates.push((idx, event_pos.clone()));
1050 }
1051 }
1052
1053 if !hwm_updates.is_empty() {
1055 let mut resume_positions = self.subscriber_resume_positions.write().await;
1056 for (idx, pos) in hwm_updates {
1057 resume_positions.insert(idx, pos);
1058 }
1059 }
1060 }
1061
1062 drop(comparator);
1063 drop(dispatchers);
1064
1065 Ok(())
1066 }
1067
1068 pub async fn broadcast_control(&self, control: SourceControl) -> Result<()> {
1070 let wrapper = SourceEventWrapper::new(
1071 self.id.clone(),
1072 SourceEvent::Control(control),
1073 chrono::Utc::now(),
1074 );
1075 self.dispatch_event(wrapper).await
1076 }
1077
1078 pub fn try_test_subscribe(
1091 &self,
1092 ) -> anyhow::Result<Box<dyn ChangeReceiver<SourceEventWrapper>>> {
1093 tokio::task::block_in_place(|| {
1094 tokio::runtime::Handle::current().block_on(self.create_streaming_receiver())
1095 })
1096 }
1097
1098 pub fn test_subscribe(&self) -> Box<dyn ChangeReceiver<SourceEventWrapper>> {
1106 self.try_test_subscribe()
1107 .expect("Failed to create test subscription receiver")
1108 }
1109
1110 pub async fn dispatch_from_task(
1138 dispatchers: Arc<RwLock<Vec<Box<dyn ChangeDispatcher<SourceEventWrapper> + Send + Sync>>>>,
1139 wrapper: SourceEventWrapper,
1140 source_id: &str,
1141 ) -> Result<()> {
1142 debug!(
1143 "[{}] Dispatching event from task: {:?}",
1144 source_id, &wrapper
1145 );
1146
1147 let arc_wrapper = Arc::new(wrapper);
1149
1150 let dispatchers_guard = dispatchers.read().await;
1152 for dispatcher in dispatchers_guard.iter() {
1153 if let Err(e) = dispatcher.dispatch_change(arc_wrapper.clone()).await {
1154 debug!("[{source_id}] Failed to dispatch event from task: {e}");
1155 }
1156 }
1157
1158 Ok(())
1159 }
1160
1161 pub async fn stop_common(&self) -> Result<()> {
1163 info!("Stopping source '{}'", self.id);
1164
1165 if let Some(tx) = self.shutdown_tx.write().await.take() {
1167 let _ = tx.send(());
1168 }
1169
1170 if let Some(mut handle) = self.task_handle.write().await.take() {
1172 match tokio::time::timeout(std::time::Duration::from_secs(5), &mut handle).await {
1173 Ok(Ok(())) => {
1174 info!("Source '{}' task completed successfully", self.id);
1175 }
1176 Ok(Err(e)) => {
1177 error!("Source '{}' task panicked: {}", self.id, e);
1178 }
1179 Err(_) => {
1180 warn!(
1181 "Source '{}' task did not complete within timeout, aborting",
1182 self.id
1183 );
1184 handle.abort();
1185 }
1186 }
1187 }
1188
1189 if self.dispatch_mode == DispatchMode::Channel {
1198 let mut dispatchers = self.dispatchers.write().await;
1199 dispatchers.clear();
1200 }
1201
1202 self.set_status(
1203 ComponentStatus::Stopped,
1204 Some(format!("Source '{}' stopped", self.id)),
1205 )
1206 .await;
1207 info!("Source '{}' stopped", self.id);
1208 Ok(())
1209 }
1210
1211 pub async fn clear_dispatchers(&self) {
1223 if self.dispatch_mode == DispatchMode::Channel {
1224 let mut dispatchers = self.dispatchers.write().await;
1225 dispatchers.clear();
1226 }
1227 }
1228
1229 pub async fn deprovision_common(&self) -> Result<()> {
1235 info!("Deprovisioning source '{}'", self.id);
1236 if let Some(store) = self.state_store().await {
1237 let count = store.clear_store(&self.id).await.map_err(|e| {
1238 anyhow::anyhow!(
1239 "Failed to clear state store for source '{}': {}",
1240 self.id,
1241 e
1242 )
1243 })?;
1244 info!(
1245 "Cleared {} keys from state store for source '{}'",
1246 count, self.id
1247 );
1248 }
1249 Ok(())
1250 }
1251
1252 pub async fn get_status(&self) -> ComponentStatus {
1254 self.status_handle.get_status().await
1255 }
1256
1257 pub async fn set_status(&self, status: ComponentStatus, message: Option<String>) {
1261 self.status_handle.set_status(status, message).await;
1262 }
1263
1264 pub async fn set_task_handle(&self, handle: tokio::task::JoinHandle<()>) {
1266 *self.task_handle.write().await = Some(handle);
1267 }
1268
1269 pub async fn set_shutdown_tx(&self, tx: tokio::sync::oneshot::Sender<()>) {
1271 *self.shutdown_tx.write().await = Some(tx);
1272 }
1273}
1274
1275#[cfg(test)]
1276mod tests {
1277 use super::*;
1278 use crate::sources::ByteLexPositionComparator;
1279
1280 #[test]
1285 fn test_params_new_defaults() {
1286 let params = SourceBaseParams::new("test-source");
1287 assert_eq!(params.id, "test-source");
1288 assert!(params.dispatch_mode.is_none());
1289 assert!(params.dispatch_buffer_capacity.is_none());
1290 assert!(params.bootstrap_provider.is_none());
1291 assert!(params.auto_start);
1292 }
1293
1294 #[test]
1295 fn test_params_with_dispatch_mode() {
1296 let params = SourceBaseParams::new("s1").with_dispatch_mode(DispatchMode::Broadcast);
1297 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1298 }
1299
1300 #[test]
1301 fn test_params_with_dispatch_buffer_capacity() {
1302 let params = SourceBaseParams::new("s1").with_dispatch_buffer_capacity(50000);
1303 assert_eq!(params.dispatch_buffer_capacity, Some(50000));
1304 }
1305
1306 #[test]
1307 fn test_params_with_auto_start_false() {
1308 let params = SourceBaseParams::new("s1").with_auto_start(false);
1309 assert!(!params.auto_start);
1310 }
1311
1312 #[test]
1313 fn test_params_builder_chaining() {
1314 let params = SourceBaseParams::new("chained")
1315 .with_dispatch_mode(DispatchMode::Broadcast)
1316 .with_dispatch_buffer_capacity(2000)
1317 .with_auto_start(false);
1318
1319 assert_eq!(params.id, "chained");
1320 assert_eq!(params.dispatch_mode, Some(DispatchMode::Broadcast));
1321 assert_eq!(params.dispatch_buffer_capacity, Some(2000));
1322 assert!(!params.auto_start);
1323 }
1324
1325 #[tokio::test]
1330 async fn test_new_defaults() {
1331 let params = SourceBaseParams::new("my-source");
1332 let base = SourceBase::new(params).unwrap();
1333
1334 assert_eq!(base.id, "my-source");
1335 assert!(base.auto_start);
1336 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1337 }
1338
1339 #[tokio::test]
1340 async fn test_get_id() {
1341 let base = SourceBase::new(SourceBaseParams::new("id-check")).unwrap();
1342 assert_eq!(base.get_id(), "id-check");
1343 }
1344
1345 #[tokio::test]
1346 async fn test_get_auto_start() {
1347 let base_default = SourceBase::new(SourceBaseParams::new("a")).unwrap();
1348 assert!(base_default.get_auto_start());
1349
1350 let base_false =
1351 SourceBase::new(SourceBaseParams::new("b").with_auto_start(false)).unwrap();
1352 assert!(!base_false.get_auto_start());
1353 }
1354
1355 #[tokio::test]
1356 async fn test_get_status_initial() {
1357 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1358 assert_eq!(base.get_status().await, ComponentStatus::Stopped);
1359 }
1360
1361 #[tokio::test]
1362 async fn test_set_status() {
1363 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1364
1365 base.set_status(ComponentStatus::Running, None).await;
1366 assert_eq!(base.get_status().await, ComponentStatus::Running);
1367
1368 base.set_status(ComponentStatus::Error, Some("oops".into()))
1369 .await;
1370 assert_eq!(base.get_status().await, ComponentStatus::Error);
1371 }
1372
1373 #[tokio::test]
1374 async fn test_status_handle_returns_handle() {
1375 let base = SourceBase::new(SourceBaseParams::new("s")).unwrap();
1376 let handle = base.status_handle();
1377
1378 assert_eq!(handle.get_status().await, ComponentStatus::Stopped);
1380
1381 handle.set_status(ComponentStatus::Starting, None).await;
1383 assert_eq!(base.get_status().await, ComponentStatus::Starting);
1384 }
1385
1386 use crate::bootstrap::{
1391 BootstrapContext, BootstrapProvider, BootstrapRequest, BootstrapResult,
1392 };
1393 use crate::channels::BootstrapEventSender;
1394 use async_trait::async_trait;
1395
1396 fn make_settings(
1397 query_id: &str,
1398 enable_bootstrap: bool,
1399 resume_from: Option<bytes::Bytes>,
1400 request_position_handle: bool,
1401 ) -> crate::config::SourceSubscriptionSettings {
1402 use std::collections::HashSet;
1403 crate::config::SourceSubscriptionSettings {
1404 source_id: "test-src".to_string(),
1405 enable_bootstrap,
1406 query_id: query_id.to_string(),
1407 nodes: HashSet::new(),
1408 relations: HashSet::new(),
1409 resume_from,
1410 request_position_handle,
1411 last_sequence: None,
1412 }
1413 }
1414
1415 struct NoopProvider;
1418
1419 #[async_trait]
1420 impl BootstrapProvider for NoopProvider {
1421 async fn bootstrap(
1422 &self,
1423 _request: BootstrapRequest,
1424 _context: &BootstrapContext,
1425 _event_tx: BootstrapEventSender,
1426 _settings: Option<&crate::config::SourceSubscriptionSettings>,
1427 ) -> Result<BootstrapResult> {
1428 Ok(BootstrapResult::default())
1429 }
1430 }
1431
1432 fn make_base_with_bootstrap(id: &str) -> SourceBase {
1433 let mut params = SourceBaseParams::new(id);
1434 params.bootstrap_provider = Some(Box::new(NoopProvider));
1435 SourceBase::new(params).unwrap()
1436 }
1437
1438 #[tokio::test]
1439 async fn test_create_position_handle_initializes_to_u64_max() {
1440 let base = SourceBase::new(SourceBaseParams::new("ph-init")).unwrap();
1441 let handle = base.create_position_handle("q1").await;
1442 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1443 }
1444
1445 #[tokio::test]
1446 async fn test_create_position_handle_idempotent_for_same_query() {
1447 let base = SourceBase::new(SourceBaseParams::new("ph-idem")).unwrap();
1448 let h1 = base.create_position_handle("q1").await;
1449 h1.store(123, Ordering::Relaxed);
1450 let h2 = base.create_position_handle("q1").await;
1451 assert!(Arc::ptr_eq(&h1, &h2));
1453 assert_eq!(h2.load(Ordering::Relaxed), 123);
1454 }
1455
1456 #[tokio::test]
1457 async fn test_remove_position_handle_drops_entry() {
1458 let base = SourceBase::new(SourceBaseParams::new("ph-rm")).unwrap();
1459 let handle = base.create_position_handle("q1").await;
1460 handle.store(42, Ordering::Relaxed);
1461 assert_eq!(base.compute_confirmed_position().await, Some(42));
1462 base.remove_position_handle("q1").await;
1463 assert_eq!(base.compute_confirmed_position().await, None);
1464 }
1465
1466 #[tokio::test]
1467 async fn test_remove_position_handle_noop_when_absent() {
1468 let base = SourceBase::new(SourceBaseParams::new("ph-rm-absent")).unwrap();
1469 base.remove_position_handle("never-registered").await;
1471 assert_eq!(base.compute_confirmed_position().await, None);
1472 }
1473
1474 #[tokio::test]
1475 async fn test_compute_confirmed_position_returns_none_when_empty() {
1476 let base = SourceBase::new(SourceBaseParams::new("ph-empty")).unwrap();
1477 assert_eq!(base.compute_confirmed_position().await, None);
1478 }
1479
1480 #[tokio::test]
1481 async fn test_compute_confirmed_position_returns_none_when_all_max() {
1482 let base = SourceBase::new(SourceBaseParams::new("ph-all-max")).unwrap();
1483 let _h1 = base.create_position_handle("q1").await;
1484 let _h2 = base.create_position_handle("q2").await;
1485 assert_eq!(base.compute_confirmed_position().await, None);
1486 }
1487
1488 #[tokio::test]
1489 async fn test_compute_confirmed_position_filters_max_returns_min() {
1490 let base = SourceBase::new(SourceBaseParams::new("ph-min")).unwrap();
1491 let h1 = base.create_position_handle("q1").await;
1492 let _h2 = base.create_position_handle("q2").await; let h3 = base.create_position_handle("q3").await;
1494 h1.store(100, Ordering::Relaxed);
1495 h3.store(50, Ordering::Relaxed);
1496 assert_eq!(base.compute_confirmed_position().await, Some(50));
1497 }
1498
1499 #[tokio::test]
1500 async fn test_compute_confirmed_position_single_real_value() {
1501 let base = SourceBase::new(SourceBaseParams::new("ph-single")).unwrap();
1502 let h1 = base.create_position_handle("q1").await;
1503 let _h2 = base.create_position_handle("q2").await;
1504 h1.store(7, Ordering::Relaxed);
1505 assert_eq!(base.compute_confirmed_position().await, Some(7));
1506 }
1507
1508 #[tokio::test]
1509 async fn test_cleanup_stale_handles_drops_orphaned_arc() {
1510 let base = SourceBase::new(SourceBaseParams::new("ph-stale")).unwrap();
1511 {
1512 let handle = base.create_position_handle("q1").await;
1513 handle.store(99, Ordering::Relaxed);
1514 }
1516 base.cleanup_stale_handles().await;
1517 assert_eq!(base.compute_confirmed_position().await, None);
1518 }
1519
1520 #[tokio::test]
1521 async fn test_cleanup_stale_handles_keeps_held_arc() {
1522 let base = SourceBase::new(SourceBaseParams::new("ph-held")).unwrap();
1523 let handle = base.create_position_handle("q1").await;
1524 handle.store(11, Ordering::Relaxed);
1525 base.cleanup_stale_handles().await;
1526 assert_eq!(base.compute_confirmed_position().await, Some(11));
1528 drop(handle);
1530 }
1531
1532 #[tokio::test]
1533 async fn test_subscribe_with_request_position_handle_returns_handle() {
1534 let base = SourceBase::new(SourceBaseParams::new("sub-handle")).unwrap();
1535 let response = base
1536 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1537 .await
1538 .unwrap();
1539 let handle = response.position_handle.expect("expected handle");
1540 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
1541 assert_eq!(base.compute_confirmed_position().await, None); }
1544
1545 #[tokio::test]
1546 async fn test_subscribe_without_request_position_handle_returns_none() {
1547 let base = SourceBase::new(SourceBaseParams::new("sub-no-handle")).unwrap();
1548 let response = base
1549 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1550 .await
1551 .unwrap();
1552 assert!(response.position_handle.is_none());
1553 let handles = base.position_handles.read().await;
1556 assert!(handles.is_empty());
1557 }
1558
1559 #[tokio::test]
1560 async fn test_subscribe_returned_handle_shared_with_internal() {
1561 let base = SourceBase::new(SourceBaseParams::new("sub-shared")).unwrap();
1562 let response = base
1563 .subscribe_with_bootstrap(&make_settings("q1", false, None, true), "test")
1564 .await
1565 .unwrap();
1566 let handle = response.position_handle.unwrap();
1567 handle.store(42, Ordering::Relaxed);
1568 assert_eq!(base.compute_confirmed_position().await, Some(42));
1569 }
1570
1571 #[tokio::test]
1572 async fn test_subscribe_with_resume_from_skips_bootstrap() {
1573 let base = make_base_with_bootstrap("sub-resume");
1574 let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1575 let response = base
1576 .subscribe_with_bootstrap(&make_settings("q1", true, Some(position), false), "test")
1577 .await
1578 .unwrap();
1579 assert!(
1580 response.bootstrap_receiver.is_none(),
1581 "resume_from must override enable_bootstrap"
1582 );
1583 }
1584
1585 #[tokio::test]
1586 async fn test_subscribe_resume_without_bootstrap_still_none() {
1587 let base = make_base_with_bootstrap("sub-resume-no-bs");
1588 let position = bytes::Bytes::copy_from_slice(&100u64.to_le_bytes());
1589 let response = base
1590 .subscribe_with_bootstrap(&make_settings("q1", false, Some(position), false), "test")
1591 .await
1592 .unwrap();
1593 assert!(response.bootstrap_receiver.is_none());
1594 }
1595
1596 #[tokio::test]
1597 async fn test_subscribe_no_resume_with_bootstrap_returns_receiver() {
1598 let base = make_base_with_bootstrap("sub-bs");
1599 let response = base
1600 .subscribe_with_bootstrap(&make_settings("q1", true, None, false), "test")
1601 .await
1602 .unwrap();
1603 assert!(
1604 response.bootstrap_receiver.is_some(),
1605 "regression guard: bootstrap path must still produce a receiver"
1606 );
1607 }
1608
1609 #[tokio::test]
1610 async fn test_subscribe_no_resume_no_bootstrap_returns_none() {
1611 let base = make_base_with_bootstrap("sub-neither");
1612 let response = base
1613 .subscribe_with_bootstrap(&make_settings("q1", false, None, false), "test")
1614 .await
1615 .unwrap();
1616 assert!(response.bootstrap_receiver.is_none());
1617 assert!(response.position_handle.is_none());
1618 }
1619
1620 fn make_event(source_id: &str, position: Option<&[u8]>) -> SourceEventWrapper {
1625 let change = drasi_core::models::SourceChange::Insert {
1626 element: drasi_core::models::Element::Node {
1627 metadata: drasi_core::models::ElementMetadata {
1628 reference: drasi_core::models::ElementReference::new(source_id, "n1"),
1629 labels: Arc::from([Arc::from("Label")]),
1630 effective_from: 0,
1631 },
1632 properties: drasi_core::models::ElementPropertyMap::new(),
1633 },
1634 };
1635 let mut wrapper = SourceEventWrapper::new(
1636 source_id.to_string(),
1637 SourceEvent::Change(change),
1638 chrono::Utc::now(),
1639 );
1640 wrapper.source_position = position.map(|p| bytes::Bytes::from(p.to_vec()));
1641 wrapper
1642 }
1643
1644 #[tokio::test]
1645 async fn test_dispatch_events_batch_empty_returns_ok() {
1646 let base = SourceBase::new(SourceBaseParams::new("batch-empty")).unwrap();
1647 let result = base.dispatch_events_batch(Vec::new()).await;
1648 assert!(result.is_ok());
1649 }
1650
1651 #[tokio::test]
1652 async fn test_dispatch_events_batch_stamps_monotonic_sequences() {
1653 let params = SourceBaseParams::new("batch-seq").with_dispatch_mode(DispatchMode::Channel);
1654 let base = SourceBase::new(params).unwrap();
1655
1656 let mut receiver = base.create_streaming_receiver().await.unwrap();
1658
1659 let events = vec![
1660 make_event("batch-seq", Some(b"\x01")),
1661 make_event("batch-seq", Some(b"\x02")),
1662 make_event("batch-seq", Some(b"\x03")),
1663 ];
1664
1665 base.dispatch_events_batch(events).await.unwrap();
1666
1667 let e1 = receiver.recv().await.unwrap();
1668 let e2 = receiver.recv().await.unwrap();
1669 let e3 = receiver.recv().await.unwrap();
1670
1671 let s1 = e1.sequence.expect("event 1 must have sequence");
1672 let s2 = e2.sequence.expect("event 2 must have sequence");
1673 let s3 = e3.sequence.expect("event 3 must have sequence");
1674
1675 assert_eq!(s2, s1 + 1, "sequences must be monotonically increasing");
1676 assert_eq!(s3, s2 + 1, "sequences must be monotonically increasing");
1677 }
1678
1679 #[tokio::test]
1680 async fn test_dispatch_events_batch_multi_dispatcher_fanout() {
1681 let params =
1682 SourceBaseParams::new("batch-fanout").with_dispatch_mode(DispatchMode::Channel);
1683 let base = SourceBase::new(params).unwrap();
1684
1685 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1687 let mut rx2 = base.create_streaming_receiver().await.unwrap();
1688
1689 let events = vec![
1690 make_event("batch-fanout", Some(b"\x01")),
1691 make_event("batch-fanout", Some(b"\x02")),
1692 ];
1693
1694 base.dispatch_events_batch(events).await.unwrap();
1695
1696 let r1_e1 = rx1.recv().await.unwrap();
1698 let r1_e2 = rx1.recv().await.unwrap();
1699 let r2_e1 = rx2.recv().await.unwrap();
1700 let r2_e2 = rx2.recv().await.unwrap();
1701
1702 assert_eq!(r1_e1.sequence, r2_e1.sequence);
1704 assert_eq!(r1_e2.sequence, r2_e2.sequence);
1705 }
1706
1707 #[tokio::test]
1708 async fn test_dispatch_events_batch_oversized_position_still_dispatches() {
1709 let params =
1710 SourceBaseParams::new("batch-oversize").with_dispatch_mode(DispatchMode::Channel);
1711 let base = SourceBase::new(params).unwrap();
1712 let mut rx = base.create_streaming_receiver().await.unwrap();
1713
1714 let big_pos = vec![0xAA; SourceBase::MAX_SOURCE_POSITION_BYTES + 1];
1716 let events = vec![make_event("batch-oversize", Some(&big_pos))];
1717
1718 base.dispatch_events_batch(events).await.unwrap();
1720
1721 let received = rx.recv().await.unwrap();
1722 assert!(received.sequence.is_some(), "event must still be stamped");
1723 assert_eq!(
1724 received.source_position.as_ref().map(|p| p.len()),
1725 Some(SourceBase::MAX_SOURCE_POSITION_BYTES + 1),
1726 "oversized position must still be delivered (checkpoint layer enforces the limit)"
1727 );
1728 }
1729
1730 #[tokio::test]
1735 async fn test_position_filter_suppresses_events_before_resume() {
1736 let params = SourceBaseParams::new("pos-filter").with_dispatch_mode(DispatchMode::Channel);
1737 let base = SourceBase::new(params).unwrap();
1738 base.set_position_comparator(ByteLexPositionComparator)
1739 .await;
1740
1741 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1743 let mut rx2 = base.create_streaming_receiver().await.unwrap();
1744
1745 base.subscriber_resume_positions
1748 .write()
1749 .await
1750 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1751
1752 let event = make_event("pos-filter", Some(&[0x00, 0x03]));
1754 base.dispatch_event(event).await.unwrap();
1755
1756 let r2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1758 .await
1759 .unwrap()
1760 .unwrap();
1761 assert_eq!(r2.source_position.as_ref().unwrap().as_ref(), &[0x00, 0x03]);
1762
1763 let r1 = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
1765 assert!(r1.is_err(), "rx1 should timeout — event was suppressed");
1766 }
1767
1768 #[tokio::test]
1769 async fn test_position_filter_delivers_events_past_resume() {
1770 let params = SourceBaseParams::new("pos-filter2").with_dispatch_mode(DispatchMode::Channel);
1771 let base = SourceBase::new(params).unwrap();
1772 base.set_position_comparator(ByteLexPositionComparator)
1773 .await;
1774
1775 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1776
1777 base.subscriber_resume_positions
1779 .write()
1780 .await
1781 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1782
1783 let event = make_event("pos-filter2", Some(&[0x00, 0x06]));
1785 base.dispatch_event(event).await.unwrap();
1786
1787 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1788 .await
1789 .unwrap()
1790 .unwrap();
1791 assert_eq!(
1792 received.source_position.as_ref().unwrap().as_ref(),
1793 &[0x00, 0x06]
1794 );
1795 }
1796
1797 #[tokio::test]
1798 async fn test_position_filter_advances_high_water_mark() {
1799 let params = SourceBaseParams::new("pos-hwm").with_dispatch_mode(DispatchMode::Channel);
1800 let base = SourceBase::new(params).unwrap();
1801 base.set_position_comparator(ByteLexPositionComparator)
1802 .await;
1803
1804 let mut rx = base.create_streaming_receiver().await.unwrap();
1805
1806 base.subscriber_resume_positions
1808 .write()
1809 .await
1810 .insert(0, Bytes::from_static(&[0x00, 0x03]));
1811
1812 base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x04])))
1814 .await
1815 .unwrap();
1816 let _ = rx.recv().await.unwrap();
1817
1818 {
1820 let positions = base.subscriber_resume_positions.read().await;
1821 assert_eq!(
1822 positions.get(&0).map(|b| b.as_ref()),
1823 Some([0x00, 0x04].as_slice()),
1824 "high-water mark should be advanced to dispatched position"
1825 );
1826 }
1827
1828 base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x01])))
1830 .await
1831 .unwrap();
1832 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1833 assert!(
1834 r.is_err(),
1835 "event below high-water mark should be suppressed after rewind"
1836 );
1837
1838 base.dispatch_event(make_event("pos-hwm", Some(&[0x00, 0x06])))
1840 .await
1841 .unwrap();
1842 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1843 .await
1844 .unwrap()
1845 .unwrap();
1846 assert_eq!(
1847 received.source_position.as_ref().unwrap().as_ref(),
1848 &[0x00, 0x06]
1849 );
1850 }
1851
1852 #[tokio::test]
1853 async fn test_position_filter_equal_position_is_suppressed() {
1854 let params = SourceBaseParams::new("pos-equal").with_dispatch_mode(DispatchMode::Channel);
1857 let base = SourceBase::new(params).unwrap();
1858 base.set_position_comparator(ByteLexPositionComparator)
1859 .await;
1860
1861 let mut rx = base.create_streaming_receiver().await.unwrap();
1862
1863 base.subscriber_resume_positions
1864 .write()
1865 .await
1866 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1867
1868 base.dispatch_event(make_event("pos-equal", Some(&[0x00, 0x05])))
1870 .await
1871 .unwrap();
1872
1873 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
1874 assert!(
1875 r.is_err(),
1876 "event at exactly resume position should be suppressed"
1877 );
1878 }
1879
1880 #[tokio::test]
1881 async fn test_position_filter_no_comparator_delivers_all() {
1882 let params = SourceBaseParams::new("no-cmp").with_dispatch_mode(DispatchMode::Channel);
1885 let base = SourceBase::new(params).unwrap();
1886 let mut rx = base.create_streaming_receiver().await.unwrap();
1889
1890 base.subscriber_resume_positions
1891 .write()
1892 .await
1893 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1894
1895 base.dispatch_event(make_event("no-cmp", Some(&[0x00, 0x03])))
1897 .await
1898 .unwrap();
1899
1900 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1901 .await
1902 .unwrap()
1903 .unwrap();
1904 assert_eq!(
1905 received.source_position.as_ref().unwrap().as_ref(),
1906 &[0x00, 0x03]
1907 );
1908 }
1909
1910 #[tokio::test]
1911 async fn test_position_filter_batch_mode() {
1912 let params = SourceBaseParams::new("pos-batch").with_dispatch_mode(DispatchMode::Channel);
1913 let base = SourceBase::new(params).unwrap();
1914 base.set_position_comparator(ByteLexPositionComparator)
1915 .await;
1916
1917 let mut rx1 = base.create_streaming_receiver().await.unwrap();
1918 let mut rx2 = base.create_streaming_receiver().await.unwrap();
1919
1920 {
1923 let mut positions = base.subscriber_resume_positions.write().await;
1924 positions.insert(0, Bytes::from_static(&[0x00, 0x05]));
1925 positions.insert(1, Bytes::from_static(&[0x00, 0x02]));
1926 }
1927
1928 let events = vec![
1929 make_event("pos-batch", Some(&[0x00, 0x01])), make_event("pos-batch", Some(&[0x00, 0x03])), make_event("pos-batch", Some(&[0x00, 0x06])), ];
1933 base.dispatch_events_batch(events).await.unwrap();
1934
1935 let r2_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1937 .await
1938 .unwrap()
1939 .unwrap();
1940 assert_eq!(
1941 r2_1.source_position.as_ref().unwrap().as_ref(),
1942 &[0x00, 0x03]
1943 );
1944
1945 let r2_2 = tokio::time::timeout(std::time::Duration::from_millis(100), rx2.recv())
1946 .await
1947 .unwrap()
1948 .unwrap();
1949 assert_eq!(
1950 r2_2.source_position.as_ref().unwrap().as_ref(),
1951 &[0x00, 0x06]
1952 );
1953
1954 let r1_1 = tokio::time::timeout(std::time::Duration::from_millis(100), rx1.recv())
1956 .await
1957 .unwrap()
1958 .unwrap();
1959 assert_eq!(
1960 r1_1.source_position.as_ref().unwrap().as_ref(),
1961 &[0x00, 0x06]
1962 );
1963
1964 let r1_extra = tokio::time::timeout(std::time::Duration::from_millis(50), rx1.recv()).await;
1966 assert!(r1_extra.is_err(), "rx1 should have no more events");
1967 }
1968
1969 #[tokio::test]
1970 async fn test_position_filter_events_without_position_delivered() {
1971 let params = SourceBaseParams::new("pos-none").with_dispatch_mode(DispatchMode::Channel);
1973 let base = SourceBase::new(params).unwrap();
1974 base.set_position_comparator(ByteLexPositionComparator)
1975 .await;
1976
1977 let mut rx = base.create_streaming_receiver().await.unwrap();
1978
1979 base.subscriber_resume_positions
1980 .write()
1981 .await
1982 .insert(0, Bytes::from_static(&[0x00, 0x05]));
1983
1984 base.dispatch_event(make_event("pos-none", None))
1986 .await
1987 .unwrap();
1988
1989 let received = tokio::time::timeout(std::time::Duration::from_millis(100), rx.recv())
1990 .await
1991 .unwrap()
1992 .unwrap();
1993 assert!(received.source_position.is_none());
1994 }
1995
1996 #[tokio::test]
2001 async fn test_sequence_position_map_populated_on_dispatch() {
2002 let params = SourceBaseParams::new("spm-1").with_dispatch_mode(DispatchMode::Channel);
2003 let base = SourceBase::new(params).unwrap();
2004 let _rx = base.create_streaming_receiver().await.unwrap();
2005
2006 let lsn: u64 = 0x1234;
2007 base.dispatch_event(make_event("spm-1", Some(&lsn.to_be_bytes())))
2008 .await
2009 .unwrap();
2010
2011 let map = base.sequence_position_map.read().await;
2012 assert_eq!(map.len(), 1);
2013 let (seq, pos) = map.iter().next().unwrap();
2014 assert_eq!(*seq, 1); assert_eq!(pos.as_ref(), &lsn.to_be_bytes());
2016 }
2017
2018 #[tokio::test]
2019 async fn test_sequence_position_map_not_populated_without_position() {
2020 let params = SourceBaseParams::new("spm-none").with_dispatch_mode(DispatchMode::Channel);
2021 let base = SourceBase::new(params).unwrap();
2022 let _rx = base.create_streaming_receiver().await.unwrap();
2023
2024 base.dispatch_event(make_event("spm-none", None))
2025 .await
2026 .unwrap();
2027
2028 let map = base.sequence_position_map.read().await;
2029 assert!(map.is_empty());
2030 }
2031
2032 #[tokio::test]
2033 async fn test_compute_confirmed_source_position_basic() {
2034 let params = SourceBaseParams::new("cssp-1").with_dispatch_mode(DispatchMode::Channel);
2035 let base = SourceBase::new(params).unwrap();
2036 let _rx = base.create_streaming_receiver().await.unwrap();
2037
2038 let handle = base.create_position_handle("q1").await;
2040
2041 for lsn in [100u64, 200, 300] {
2043 base.dispatch_event(make_event("cssp-1", Some(&lsn.to_be_bytes())))
2044 .await
2045 .unwrap();
2046 }
2047
2048 handle.store(2, Ordering::Relaxed);
2050
2051 let confirmed = base.compute_confirmed_source_position().await;
2052 assert!(confirmed.is_some());
2053 let lsn_bytes = confirmed.unwrap();
2054 assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 200);
2055 }
2056
2057 #[tokio::test]
2058 async fn test_compute_confirmed_source_position_returns_none_when_no_handles() {
2059 let base = SourceBase::new(SourceBaseParams::new("cssp-none")).unwrap();
2060 assert!(base.compute_confirmed_source_position().await.is_none());
2061 }
2062
2063 #[tokio::test]
2064 async fn test_compute_confirmed_source_position_returns_none_when_all_max() {
2065 let base = SourceBase::new(SourceBaseParams::new("cssp-max")).unwrap();
2066 let _h = base.create_position_handle("q1").await;
2067 assert!(base.compute_confirmed_source_position().await.is_none());
2069 }
2070
2071 #[tokio::test]
2072 async fn test_compute_confirmed_source_position_min_of_two_queries() {
2073 let params = SourceBaseParams::new("cssp-2q").with_dispatch_mode(DispatchMode::Channel);
2074 let base = SourceBase::new(params).unwrap();
2075 let _rx = base.create_streaming_receiver().await.unwrap();
2076
2077 let h1 = base.create_position_handle("q1").await;
2078 let h2 = base.create_position_handle("q2").await;
2079
2080 for lsn in [100u64, 200, 300] {
2082 base.dispatch_event(make_event("cssp-2q", Some(&lsn.to_be_bytes())))
2083 .await
2084 .unwrap();
2085 }
2086
2087 h1.store(3, Ordering::Relaxed);
2089 h2.store(1, Ordering::Relaxed);
2090
2091 let confirmed = base.compute_confirmed_source_position().await;
2092 assert!(confirmed.is_some());
2093 let lsn_bytes = confirmed.unwrap();
2094 assert_eq!(u64::from_be_bytes(lsn_bytes[..8].try_into().unwrap()), 100);
2096 }
2097
2098 #[tokio::test]
2099 async fn test_prune_position_map() {
2100 let params = SourceBaseParams::new("prune-1").with_dispatch_mode(DispatchMode::Channel);
2101 let base = SourceBase::new(params).unwrap();
2102 let _rx = base.create_streaming_receiver().await.unwrap();
2103
2104 for lsn in [10u64, 20, 30, 40, 50] {
2105 base.dispatch_event(make_event("prune-1", Some(&lsn.to_be_bytes())))
2106 .await
2107 .unwrap();
2108 }
2109 assert_eq!(base.sequence_position_map.read().await.len(), 5);
2111
2112 base.prune_position_map(3).await;
2113 let map = base.sequence_position_map.read().await;
2114 assert_eq!(map.len(), 2); assert!(map.contains_key(&4));
2116 assert!(map.contains_key(&5));
2117 }
2118
2119 #[tokio::test]
2120 async fn test_prune_position_map_all() {
2121 let params = SourceBaseParams::new("prune-all").with_dispatch_mode(DispatchMode::Channel);
2122 let base = SourceBase::new(params).unwrap();
2123 let _rx = base.create_streaming_receiver().await.unwrap();
2124
2125 for lsn in [10u64, 20] {
2126 base.dispatch_event(make_event("prune-all", Some(&lsn.to_be_bytes())))
2127 .await
2128 .unwrap();
2129 }
2130 base.prune_position_map(100).await; assert!(base.sequence_position_map.read().await.is_empty());
2132 }
2133
2134 #[tokio::test]
2135 async fn test_position_handle_initialized_to_last_sequence() {
2136 use crate::config::SourceSubscriptionSettings;
2137
2138 let params = SourceBaseParams::new("ph-init").with_dispatch_mode(DispatchMode::Channel);
2139 let base = SourceBase::new(params).unwrap();
2140
2141 let settings = SourceSubscriptionSettings {
2142 query_id: "q1".to_string(),
2143 source_id: "ph-init".to_string(),
2144 enable_bootstrap: false,
2145 resume_from: Some(Bytes::from_static(&[0x00, 0x01])),
2146 last_sequence: Some(42),
2147 request_position_handle: true,
2148 nodes: Default::default(),
2149 relations: Default::default(),
2150 };
2151
2152 let response = base
2153 .subscribe_with_bootstrap(&settings, "test")
2154 .await
2155 .unwrap();
2156 let handle = response.position_handle.expect("should have handle");
2157 assert_eq!(handle.load(Ordering::Relaxed), 42);
2159 }
2160
2161 #[tokio::test]
2162 async fn test_position_handle_stays_max_without_last_sequence() {
2163 use crate::config::SourceSubscriptionSettings;
2164
2165 let params = SourceBaseParams::new("ph-no-ls").with_dispatch_mode(DispatchMode::Channel);
2166 let base = SourceBase::new(params).unwrap();
2167
2168 let settings = SourceSubscriptionSettings {
2169 query_id: "q1".to_string(),
2170 source_id: "ph-no-ls".to_string(),
2171 enable_bootstrap: true,
2172 resume_from: None,
2173 last_sequence: None,
2174 request_position_handle: true,
2175 nodes: Default::default(),
2176 relations: Default::default(),
2177 };
2178
2179 let response = base
2180 .subscribe_with_bootstrap(&settings, "test")
2181 .await
2182 .unwrap();
2183 let handle = response.position_handle.expect("should have handle");
2184 assert_eq!(handle.load(Ordering::Relaxed), u64::MAX);
2186 }
2187
2188 #[tokio::test]
2189 async fn test_batch_dispatch_populates_sequence_position_map() {
2190 let params = SourceBaseParams::new("spm-batch").with_dispatch_mode(DispatchMode::Channel);
2191 let base = SourceBase::new(params).unwrap();
2192 let _rx = base.create_streaming_receiver().await.unwrap();
2193
2194 let events = vec![
2195 make_event("spm-batch", Some(&100u64.to_be_bytes())),
2196 make_event("spm-batch", Some(&200u64.to_be_bytes())),
2197 make_event("spm-batch", None), ];
2199
2200 base.dispatch_events_batch(events).await.unwrap();
2201
2202 let map = base.sequence_position_map.read().await;
2203 assert_eq!(map.len(), 2);
2205 assert!(map.contains_key(&1));
2206 assert!(map.contains_key(&2));
2207 }
2208
2209 #[tokio::test]
2210 async fn test_position_filter_rewind_protection_multi_subscriber() {
2211 let params = SourceBaseParams::new("rewind").with_dispatch_mode(DispatchMode::Channel);
2216 let base = SourceBase::new(params).unwrap();
2217 base.set_position_comparator(ByteLexPositionComparator)
2218 .await;
2219
2220 let mut rx_a = base.create_streaming_receiver().await.unwrap();
2222 base.subscriber_resume_positions
2223 .write()
2224 .await
2225 .insert(0, Bytes::from_static(&[0x10]));
2226
2227 base.dispatch_event(make_event("rewind", Some(&[0x20])))
2229 .await
2230 .unwrap();
2231 let ev = rx_a.recv().await.unwrap();
2232 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2233
2234 base.dispatch_event(make_event("rewind", Some(&[0x30])))
2235 .await
2236 .unwrap();
2237 let ev = rx_a.recv().await.unwrap();
2238 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2239
2240 let mut rx_b = base.create_streaming_receiver().await.unwrap();
2242 base.subscriber_resume_positions
2243 .write()
2244 .await
2245 .insert(1, Bytes::from_static(&[0x10]));
2246
2247 base.dispatch_event(make_event("rewind", Some(&[0x20])))
2251 .await
2252 .unwrap();
2253
2254 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2256 assert!(
2257 r.is_err(),
2258 "subscriber A should not see replayed event at [0x20]"
2259 );
2260
2261 let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2263 .await
2264 .unwrap()
2265 .unwrap();
2266 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x20]);
2267
2268 base.dispatch_event(make_event("rewind", Some(&[0x30])))
2270 .await
2271 .unwrap();
2272
2273 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx_a.recv()).await;
2274 assert!(
2275 r.is_err(),
2276 "subscriber A should not see replayed event at [0x30]"
2277 );
2278
2279 let ev = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2280 .await
2281 .unwrap()
2282 .unwrap();
2283 assert_eq!(ev.source_position.as_ref().unwrap().as_ref(), &[0x30]);
2284
2285 base.dispatch_event(make_event("rewind", Some(&[0x40])))
2287 .await
2288 .unwrap();
2289
2290 let ev_a = tokio::time::timeout(std::time::Duration::from_millis(100), rx_a.recv())
2291 .await
2292 .unwrap()
2293 .unwrap();
2294 assert_eq!(ev_a.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2295
2296 let ev_b = tokio::time::timeout(std::time::Duration::from_millis(100), rx_b.recv())
2297 .await
2298 .unwrap()
2299 .unwrap();
2300 assert_eq!(ev_b.source_position.as_ref().unwrap().as_ref(), &[0x40]);
2301 }
2302
2303 #[tokio::test]
2304 async fn test_high_water_mark_set_for_new_subscriber_without_resume() {
2305 let params = SourceBaseParams::new("hwm-new").with_dispatch_mode(DispatchMode::Channel);
2308 let base = SourceBase::new(params).unwrap();
2309 base.set_position_comparator(ByteLexPositionComparator)
2310 .await;
2311
2312 let mut rx = base.create_streaming_receiver().await.unwrap();
2313 base.dispatch_event(make_event("hwm-new", Some(&[0x10])))
2317 .await
2318 .unwrap();
2319 let _ = rx.recv().await.unwrap();
2320
2321 {
2323 let positions = base.subscriber_resume_positions.read().await;
2324 assert_eq!(
2325 positions.get(&0).map(|b| b.as_ref()),
2326 Some([0x10].as_slice()),
2327 "high-water mark should be set after first dispatch"
2328 );
2329 }
2330
2331 base.dispatch_event(make_event("hwm-new", Some(&[0x05])))
2333 .await
2334 .unwrap();
2335 let r = tokio::time::timeout(std::time::Duration::from_millis(50), rx.recv()).await;
2336 assert!(
2337 r.is_err(),
2338 "event below high-water mark should be suppressed"
2339 );
2340 }
2341}