1#![allow(unexpected_cfgs)]
2pub mod config;
23pub mod descriptor;
24pub mod grpc;
25pub mod mapping;
26pub mod rpc;
27
28pub use config::{
29 StartPosition, SuiDeepBookSourceConfig, Transport, DEFAULT_DEEPBOOK_PACKAGE_ID,
30 DEFAULT_SUI_MAINNET_GRPC, DEFAULT_SUI_MAINNET_RPC,
31};
32
33use anyhow::Result;
34use async_trait::async_trait;
35use drasi_lib::channels::{ComponentStatus, DispatchMode, SubscriptionResponse};
36use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
37use drasi_lib::sources::Source;
38use drasi_lib::state_store::StateStoreProvider;
39use log::{debug, error, info, warn};
40use std::collections::{HashMap, HashSet};
41use std::sync::Arc;
42use std::time::Duration;
43use tokio::sync::watch;
44use tokio::sync::RwLock;
45
46use crate::mapping::{
47 build_order_node, build_pool_node, build_relationship, build_trader_node, event_order_id,
48 event_pool_id, map_event_to_change, should_include_event, EnrichmentConfig,
49};
50use crate::rpc::{EventCursor, SuiRpcClient};
51
52const CURSOR_STATE_KEY: &str = "cursor";
53const GRPC_CURSOR_STATE_KEY: &str = "grpc_checkpoint_seq";
54
55pub struct SuiDeepBookSource {
56 base: SourceBase,
57 config: SuiDeepBookSourceConfig,
58 state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
59 task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
60 shutdown_tx: watch::Sender<bool>,
61 shutdown_rx: watch::Receiver<bool>,
62}
63
64impl SuiDeepBookSource {
65 pub fn new(id: impl Into<String>, config: SuiDeepBookSourceConfig) -> Result<Self> {
66 config.validate()?;
67 let id = id.into();
68 let params = SourceBaseParams::new(&id);
69 let (shutdown_tx, shutdown_rx) = watch::channel(false);
70
71 Ok(Self {
72 base: SourceBase::new(params)?,
73 config,
74 state_store: Arc::new(RwLock::new(None)),
75 task_handle: Arc::new(RwLock::new(None)),
76 shutdown_tx,
77 shutdown_rx,
78 })
79 }
80
81 pub fn builder(id: impl Into<String>) -> SuiDeepBookSourceBuilder {
82 SuiDeepBookSourceBuilder::new(id)
83 }
84}
85
86#[async_trait]
87impl Source for SuiDeepBookSource {
88 fn id(&self) -> &str {
89 &self.base.id
90 }
91
92 fn type_name(&self) -> &str {
93 "sui-deepbook"
94 }
95
96 fn properties(&self) -> HashMap<String, serde_json::Value> {
97 let mut props = HashMap::new();
98 props.insert(
99 "rpc_endpoint".to_string(),
100 serde_json::Value::String(self.config.rpc_endpoint.clone()),
101 );
102 props.insert(
103 "deepbook_package_id".to_string(),
104 serde_json::Value::String(self.config.deepbook_package_id.clone()),
105 );
106 props.insert(
107 "poll_interval_ms".to_string(),
108 serde_json::Value::Number(self.config.poll_interval_ms.into()),
109 );
110 props.insert(
111 "request_limit".to_string(),
112 serde_json::Value::Number(self.config.request_limit.into()),
113 );
114 props.insert(
115 "event_filters".to_string(),
116 serde_json::Value::Array(
117 self.config
118 .event_filters
119 .iter()
120 .cloned()
121 .map(serde_json::Value::String)
122 .collect(),
123 ),
124 );
125 props.insert(
126 "pools".to_string(),
127 serde_json::Value::Array(
128 self.config
129 .pools
130 .iter()
131 .cloned()
132 .map(serde_json::Value::String)
133 .collect(),
134 ),
135 );
136 props.insert(
137 "start_position".to_string(),
138 serde_json::to_value(self.config.start_position).unwrap_or(serde_json::Value::Null),
139 );
140 props
141 }
142
143 fn auto_start(&self) -> bool {
144 self.base.get_auto_start()
145 }
146
147 async fn status(&self) -> ComponentStatus {
148 self.base.get_status().await
149 }
150
151 async fn start(&self) -> Result<()> {
152 if self.base.get_status().await == ComponentStatus::Running {
153 return Ok(());
154 }
155
156 self.base.set_status(ComponentStatus::Starting, None).await;
157 info!("Starting Sui DeepBook source '{}'", self.base.id);
158
159 let source_id = self.base.id.clone();
160 let config = self.config.clone();
161 let base = self.base.clone_shared();
162 let state_store = self.state_store.read().await.clone();
163 let mut shutdown_rx = self.shutdown_rx.clone();
164
165 let task_handle = tokio::spawn(async move {
166 let result = match config.transport {
167 Transport::Grpc => {
168 run_grpc_stream(&source_id, config, &base, state_store, &mut shutdown_rx).await
169 }
170 Transport::JsonRpc => {
171 run_poll_loop(&source_id, config, &base, state_store, &mut shutdown_rx).await
172 }
173 };
174 if let Err(err) = result {
175 error!("Sui DeepBook task failed for '{source_id}': {err}");
176 base.set_status(ComponentStatus::Error, Some(err.to_string()))
177 .await;
178 }
179 });
180
181 *self.task_handle.write().await = Some(task_handle);
182 self.base.set_status(ComponentStatus::Running, None).await;
183 Ok(())
184 }
185
186 async fn stop(&self) -> Result<()> {
187 info!("Stopping Sui DeepBook source '{}'", self.base.id);
188 self.base.set_status(ComponentStatus::Stopping, None).await;
189
190 if let Err(err) = self.shutdown_tx.send(true) {
191 warn!(
192 "Failed sending shutdown signal for Sui DeepBook source '{}': {err}",
193 self.base.id
194 );
195 }
196
197 if let Some(handle) = self.task_handle.write().await.take() {
198 match tokio::time::timeout(Duration::from_secs(5), handle).await {
199 Ok(Ok(())) => debug!("Sui DeepBook polling task stopped gracefully"),
200 Ok(Err(err)) => warn!("Sui DeepBook polling task panicked: {err}"),
201 Err(_) => warn!("Sui DeepBook polling task did not stop within timeout"),
202 }
203 }
204
205 self.base.set_status(ComponentStatus::Stopped, None).await;
206 Ok(())
207 }
208
209 async fn subscribe(
210 &self,
211 settings: drasi_lib::config::SourceSubscriptionSettings,
212 ) -> Result<SubscriptionResponse> {
213 self.base
214 .subscribe_with_bootstrap(&settings, "Sui DeepBook")
215 .await
216 }
217
218 fn as_any(&self) -> &dyn std::any::Any {
219 self
220 }
221
222 async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
223 self.base.initialize(context.clone()).await;
224 if let Some(state_store) = context.state_store {
225 *self.state_store.write().await = Some(state_store);
226 debug!(
227 "State store injected into Sui DeepBook source '{}'",
228 self.base.id
229 );
230 }
231 }
232
233 async fn set_bootstrap_provider(
234 &self,
235 provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
236 ) {
237 self.base.set_bootstrap_provider(provider).await;
238 }
239}
240
241async fn run_grpc_stream(
244 source_id: &str,
245 config: SuiDeepBookSourceConfig,
246 base: &SourceBase,
247 state_store: Option<Arc<dyn StateStoreProvider>>,
248 shutdown_rx: &mut watch::Receiver<bool>,
249) -> Result<()> {
250 let grpc_endpoint = config.effective_grpc_endpoint().to_owned();
251 info!("Starting gRPC checkpoint stream for '{source_id}' from {grpc_endpoint}");
252
253 let rpc_client = SuiRpcClient::new(config.rpc_endpoint.clone())?;
255
256 let enrichment = EnrichmentConfig {
257 enable_pool_nodes: config.enable_pool_nodes,
258 enable_trader_nodes: config.enable_trader_nodes,
259 enable_order_nodes: config.enable_order_nodes,
260 };
261 let mut enrichment_state = EnrichmentState {
262 seen_pools: HashSet::new(),
263 seen_traders: HashSet::new(),
264 seen_orders: HashSet::new(),
265 };
266
267 let mut last_checkpoint_seq = load_grpc_cursor(source_id, &state_store).await?;
268 if let Some(seq) = last_checkpoint_seq {
269 info!("Resuming gRPC stream from checkpoint sequence {seq}");
270 }
271
272 let mut consecutive_failures = 0u32;
273 let max_retries = 20u32;
274
275 loop {
276 if *shutdown_rx.borrow() {
277 info!("Received shutdown signal for gRPC stream '{source_id}'");
278 break;
279 }
280
281 let mut grpc_client = match crate::grpc::SuiGrpcClient::new(&grpc_endpoint) {
283 Ok(c) => c,
284 Err(err) => {
285 consecutive_failures += 1;
286 error!("Failed to create gRPC client (attempt {consecutive_failures}): {err}");
287 if consecutive_failures >= max_retries {
288 return Err(err);
289 }
290 let backoff_ms = backoff_delay_ms(consecutive_failures);
291 if wait_for_poll_or_shutdown(shutdown_rx, backoff_ms).await {
292 break;
293 }
294 continue;
295 }
296 };
297
298 let mut stream = match grpc_client.subscribe_checkpoints().await {
299 Ok(s) => {
300 info!("gRPC checkpoint stream established for '{source_id}'");
301 s
302 }
303 Err(err) => {
304 consecutive_failures += 1;
305 error!(
306 "Failed to subscribe to checkpoints (attempt {consecutive_failures}): {err}"
307 );
308 if consecutive_failures >= max_retries {
309 return Err(err);
310 }
311 let backoff_ms = backoff_delay_ms(consecutive_failures);
312 if wait_for_poll_or_shutdown(shutdown_rx, backoff_ms).await {
313 break;
314 }
315 continue;
316 }
317 };
318
319 loop {
321 if *shutdown_rx.borrow() {
322 break;
323 }
324
325 let msg = tokio::select! {
326 msg = stream.message() => msg,
327 _ = shutdown_rx.changed() => {
328 break;
329 }
330 };
331
332 match msg {
333 Ok(Some(response)) => {
334 consecutive_failures = 0;
335 let seq = response.cursor.unwrap_or(0);
336
337 if let Some(last_seq) = last_checkpoint_seq {
339 if seq <= last_seq {
340 continue;
341 }
342 }
343
344 if let Some(checkpoint) = response.checkpoint {
345 let events = crate::grpc::extract_deepbook_events(
346 &checkpoint,
347 &config.deepbook_package_id,
348 );
349
350 if !events.is_empty() {
351 info!(
352 "Checkpoint {seq}: extracted {} DeepBook event(s)",
353 events.len()
354 );
355 }
356
357 for event in &events {
358 if !should_include_event(event, &config.event_filters, &config.pools) {
359 debug!("Skipping event (filtered): {}", event.event_type);
360 continue;
361 }
362
363 let effective_from = event.timestamp_ms.unwrap_or_else(|| {
364 chrono::Utc::now().timestamp_millis().max(0) as u64
365 });
366
367 emit_enrichment_nodes(
368 source_id,
369 event,
370 &enrichment,
371 &rpc_client,
372 &mut enrichment_state,
373 effective_from,
374 base,
375 )
376 .await?;
377
378 let event_entity_id = crate::mapping::derive_entity_id_pub(event);
379 let change = map_event_to_change(source_id, event);
380 debug!(
381 "Dispatching gRPC event: entity_id={event_entity_id}, type={}",
382 event.event_type
383 );
384 base.dispatch_source_change(change).await?;
385
386 emit_enrichment_relationships(
387 source_id,
388 event,
389 &enrichment,
390 &event_entity_id,
391 effective_from,
392 base,
393 )
394 .await?;
395 }
396 } else {
397 debug!("Checkpoint {seq}: no checkpoint data in response");
398 }
399
400 save_grpc_cursor(source_id, seq, &state_store).await?;
401 last_checkpoint_seq = Some(seq);
402 }
403 Ok(None) => {
404 warn!("gRPC checkpoint stream ended for '{source_id}', reconnecting...");
405 break;
406 }
407 Err(err) => {
408 consecutive_failures += 1;
409 error!("gRPC stream error (attempt {consecutive_failures}): {err}");
410 break;
411 }
412 }
413 }
414
415 if *shutdown_rx.borrow() {
416 break;
417 }
418
419 let backoff_ms = backoff_delay_ms(consecutive_failures);
420 warn!("Reconnecting gRPC stream in {backoff_ms}ms...");
421 if wait_for_poll_or_shutdown(shutdown_rx, backoff_ms).await {
422 break;
423 }
424 }
425
426 Ok(())
427}
428
429fn backoff_delay_ms(failures: u32) -> u64 {
430 let base_ms = 1000u64;
431 let max_ms = 30_000u64;
432 let delay = base_ms * 2u64.saturating_pow(failures.min(15));
433 delay.min(max_ms)
434}
435
436async fn load_grpc_cursor(
437 source_id: &str,
438 state_store: &Option<Arc<dyn StateStoreProvider>>,
439) -> Result<Option<u64>> {
440 let Some(store) = state_store else {
441 return Ok(None);
442 };
443
444 let Some(bytes) = store.get(source_id, GRPC_CURSOR_STATE_KEY).await? else {
445 return Ok(None);
446 };
447
448 match serde_json::from_slice::<u64>(&bytes) {
449 Ok(seq) => Ok(Some(seq)),
450 Err(err) => {
451 warn!("Failed to parse gRPC cursor for source '{source_id}': {err}. Clearing state.");
452 let _ = store.delete(source_id, GRPC_CURSOR_STATE_KEY).await;
453 Ok(None)
454 }
455 }
456}
457
458async fn save_grpc_cursor(
459 source_id: &str,
460 seq: u64,
461 state_store: &Option<Arc<dyn StateStoreProvider>>,
462) -> Result<()> {
463 let Some(store) = state_store else {
464 return Ok(());
465 };
466
467 let bytes = serde_json::to_vec(&seq)?;
468 store.set(source_id, GRPC_CURSOR_STATE_KEY, bytes).await?;
469 Ok(())
470}
471
472async fn run_poll_loop(
475 source_id: &str,
476 config: SuiDeepBookSourceConfig,
477 base: &SourceBase,
478 state_store: Option<Arc<dyn StateStoreProvider>>,
479 shutdown_rx: &mut watch::Receiver<bool>,
480) -> Result<()> {
481 let rpc_client = SuiRpcClient::new(config.rpc_endpoint.clone())?;
482 let query_filter = serde_json::json!({
485 "MoveModule": {
486 "package": config.deepbook_package_id,
487 "module": "pool"
488 }
489 });
490
491 let mut cursor = load_cursor(source_id, &state_store).await?;
492
493 if cursor.is_none() && matches!(config.start_position, StartPosition::Now) {
494 cursor =
495 initialize_cursor_for_now(&rpc_client, &query_filter, config.request_limit).await?;
496 if let Some(cursor_ref) = cursor.as_ref() {
497 save_cursor(source_id, cursor_ref, &state_store).await?;
498 }
499 }
500
501 let timestamp_floor = match config.start_position {
502 StartPosition::Timestamp(value) => Some(value.max(0) as u64),
503 _ => None,
504 };
505
506 let enrichment = EnrichmentConfig {
507 enable_pool_nodes: config.enable_pool_nodes,
508 enable_trader_nodes: config.enable_trader_nodes,
509 enable_order_nodes: config.enable_order_nodes,
510 };
511 let mut enrichment_state = EnrichmentState {
512 seen_pools: HashSet::new(),
513 seen_traders: HashSet::new(),
514 seen_orders: HashSet::new(),
515 };
516
517 if config.lookback_events > 0 && cursor.is_some() {
521 info!(
522 "Fetching up to {} recent events for lookback",
523 config.lookback_events
524 );
525 match fetch_lookback_events(
526 source_id,
527 &config,
528 &rpc_client,
529 &query_filter,
530 &enrichment,
531 &mut enrichment_state,
532 base,
533 )
534 .await
535 {
536 Ok(count) => info!("Lookback phase complete: processed {count} events"),
537 Err(err) => warn!("Lookback phase failed (non-fatal, continuing): {err}"),
538 }
539 }
540
541 let mut consecutive_failures = 0usize;
542 loop {
543 if *shutdown_rx.borrow() {
544 info!("Received shutdown signal for Sui DeepBook source '{source_id}'");
545 break;
546 }
547
548 let query_result = match rpc_client
549 .query_events(
550 query_filter.clone(),
551 cursor.as_ref(),
552 config.request_limit,
553 false,
554 )
555 .await
556 {
557 Ok(result) => {
558 consecutive_failures = 0;
559 result
560 }
561 Err(err) => {
562 consecutive_failures += 1;
563 error!("Failed querying DeepBook events (attempt {consecutive_failures}): {err}");
564 if consecutive_failures >= 10 {
565 return Err(err);
566 }
567 if wait_for_poll_or_shutdown(shutdown_rx, config.poll_interval_ms).await {
568 break;
569 }
570 continue;
571 }
572 };
573
574 let mut latest_cursor = cursor.clone();
575 for event in query_result.data {
576 latest_cursor = Some(event.id.clone());
577
578 if event.package_id != config.deepbook_package_id {
579 continue;
580 }
581
582 if let Some(ts_floor) = timestamp_floor {
583 if event
584 .timestamp_ms
585 .is_some_and(|timestamp| timestamp < ts_floor)
586 {
587 continue;
588 }
589 }
590
591 if !should_include_event(&event, &config.event_filters, &config.pools) {
592 continue;
593 }
594
595 let effective_from = event
596 .timestamp_ms
597 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().max(0) as u64);
598
599 emit_enrichment_nodes(
601 source_id,
602 &event,
603 &enrichment,
604 &rpc_client,
605 &mut enrichment_state,
606 effective_from,
607 base,
608 )
609 .await?;
610
611 let event_entity_id = crate::mapping::derive_entity_id_pub(&event);
613 let change = map_event_to_change(source_id, &event);
614 base.dispatch_source_change(change).await?;
615
616 emit_enrichment_relationships(
618 source_id,
619 &event,
620 &enrichment,
621 &event_entity_id,
622 effective_from,
623 base,
624 )
625 .await?;
626 }
627
628 if let Some(next_cursor) = query_result.next_cursor {
632 latest_cursor = Some(next_cursor);
633 }
634
635 if latest_cursor != cursor {
636 cursor = latest_cursor;
637 if let Some(cursor_ref) = cursor.as_ref() {
638 save_cursor(source_id, cursor_ref, &state_store).await?;
639 }
640 }
641
642 if query_result.has_next_page {
643 continue;
644 }
645
646 if wait_for_poll_or_shutdown(shutdown_rx, config.poll_interval_ms).await {
647 break;
648 }
649 }
650
651 Ok(())
652}
653
654struct EnrichmentState {
656 seen_pools: HashSet<String>,
657 seen_traders: HashSet<String>,
658 seen_orders: HashSet<String>,
659}
660
661async fn emit_enrichment_nodes(
663 source_id: &str,
664 event: &crate::rpc::SuiEvent,
665 enrichment: &EnrichmentConfig,
666 rpc_client: &SuiRpcClient,
667 state: &mut EnrichmentState,
668 effective_from: u64,
669 base: &SourceBase,
670) -> Result<()> {
671 if enrichment.enable_pool_nodes {
673 if let Some(pool_id) = event_pool_id(event) {
674 if state.seen_pools.insert(pool_id.clone()) {
675 let object_data = match rpc_client.get_object(&pool_id).await {
676 Ok(data) => {
677 debug!("Fetched pool metadata for {pool_id}");
678 Some(data)
679 }
680 Err(err) => {
681 warn!("Failed to fetch pool object for {pool_id}: {err}");
682 None
683 }
684 };
685 let pool_change =
686 build_pool_node(source_id, &pool_id, object_data.as_ref(), effective_from);
687 base.dispatch_source_change(pool_change).await?;
688 }
689 }
690 }
691
692 if enrichment.enable_trader_nodes
694 && !event.sender.is_empty()
695 && state.seen_traders.insert(event.sender.clone())
696 {
697 let trader_change = build_trader_node(source_id, &event.sender, effective_from);
698 base.dispatch_source_change(trader_change).await?;
699 }
700
701 if enrichment.enable_order_nodes {
703 if let Some(order_id) = event_order_id(event) {
704 if state.seen_orders.insert(order_id.clone()) {
705 let pool_id = event_pool_id(event);
706 let order_change =
707 build_order_node(source_id, &order_id, pool_id.as_deref(), effective_from);
708 base.dispatch_source_change(order_change).await?;
709 }
710 }
711 }
712
713 Ok(())
714}
715
716async fn emit_enrichment_relationships(
718 source_id: &str,
719 event: &crate::rpc::SuiEvent,
720 enrichment: &EnrichmentConfig,
721 event_entity_id: &str,
722 effective_from: u64,
723 base: &SourceBase,
724) -> Result<()> {
725 if enrichment.enable_pool_nodes {
727 if let Some(pool_id) = event_pool_id(event) {
728 let rel = build_relationship(
729 source_id,
730 "IN_POOL",
731 &format!("rel:in_pool:{event_entity_id}"),
732 event_entity_id,
733 &format!("pool_meta:{pool_id}"),
734 effective_from,
735 );
736 base.dispatch_source_change(rel).await?;
737 }
738 }
739
740 if enrichment.enable_trader_nodes && !event.sender.is_empty() {
742 let rel = build_relationship(
743 source_id,
744 "SENT_BY",
745 &format!("rel:sent_by:{event_entity_id}"),
746 event_entity_id,
747 &format!("trader:{}", event.sender),
748 effective_from,
749 );
750 base.dispatch_source_change(rel).await?;
751 }
752
753 if enrichment.enable_order_nodes {
755 if let Some(order_id) = event_order_id(event) {
756 let rel = build_relationship(
757 source_id,
758 "FOR_ORDER",
759 &format!("rel:for_order:{event_entity_id}"),
760 event_entity_id,
761 &format!("order_meta:{order_id}"),
762 effective_from,
763 );
764 base.dispatch_source_change(rel).await?;
765 }
766 }
767
768 Ok(())
769}
770
771async fn initialize_cursor_for_now(
772 rpc_client: &SuiRpcClient,
773 query_filter: &serde_json::Value,
774 request_limit: u16,
775) -> Result<Option<EventCursor>> {
776 let result = rpc_client
777 .query_events(query_filter.clone(), None, request_limit.min(10), true)
778 .await?;
779
780 if let Some(first) = result.data.first() {
781 return Ok(Some(first.id.clone()));
782 }
783 Ok(result.next_cursor)
784}
785
786async fn fetch_lookback_events(
790 source_id: &str,
791 config: &SuiDeepBookSourceConfig,
792 rpc_client: &SuiRpcClient,
793 query_filter: &serde_json::Value,
794 enrichment: &EnrichmentConfig,
795 enrichment_state: &mut EnrichmentState,
796 base: &SourceBase,
797) -> Result<usize> {
798 let mut all_events = Vec::new();
799 let mut page_cursor: Option<EventCursor> = None;
800 let remaining = config.lookback_events as usize;
801 let page_size = config.request_limit.min(50);
802
803 loop {
805 let result = rpc_client
806 .query_events(
807 query_filter.clone(),
808 page_cursor.as_ref(),
809 page_size,
810 true, )
812 .await?;
813
814 for event in result.data {
815 if event.package_id != config.deepbook_package_id {
816 continue;
817 }
818 if !should_include_event(&event, &config.event_filters, &config.pools) {
819 continue;
820 }
821 all_events.push(event);
822 if all_events.len() >= remaining {
823 break;
824 }
825 }
826
827 if all_events.len() >= remaining || !result.has_next_page {
828 break;
829 }
830 page_cursor = result.next_cursor;
831 }
832
833 all_events.reverse();
835
836 let count = all_events.len();
837 debug!("Lookback: processing {count} events in chronological order");
838
839 for event in &all_events {
840 let effective_from = event
841 .timestamp_ms
842 .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().max(0) as u64);
843
844 emit_enrichment_nodes(
845 source_id,
846 event,
847 enrichment,
848 rpc_client,
849 enrichment_state,
850 effective_from,
851 base,
852 )
853 .await?;
854
855 let event_entity_id = crate::mapping::derive_entity_id_pub(event);
856 let change = map_event_to_change(source_id, event);
857 base.dispatch_source_change(change).await?;
858
859 emit_enrichment_relationships(
860 source_id,
861 event,
862 enrichment,
863 &event_entity_id,
864 effective_from,
865 base,
866 )
867 .await?;
868 }
869
870 Ok(count)
871}
872
873async fn load_cursor(
874 source_id: &str,
875 state_store: &Option<Arc<dyn StateStoreProvider>>,
876) -> Result<Option<EventCursor>> {
877 let Some(store) = state_store else {
878 return Ok(None);
879 };
880
881 let Some(bytes) = store.get(source_id, CURSOR_STATE_KEY).await? else {
882 return Ok(None);
883 };
884
885 match serde_json::from_slice::<EventCursor>(&bytes) {
886 Ok(cursor) => Ok(Some(cursor)),
887 Err(err) => {
888 warn!(
889 "Failed to parse persisted DeepBook cursor for source '{source_id}': {err}. Clearing state."
890 );
891 let _ = store.delete(source_id, CURSOR_STATE_KEY).await;
892 Ok(None)
893 }
894 }
895}
896
897async fn save_cursor(
898 source_id: &str,
899 cursor: &EventCursor,
900 state_store: &Option<Arc<dyn StateStoreProvider>>,
901) -> Result<()> {
902 let Some(store) = state_store else {
903 return Ok(());
904 };
905
906 let bytes = serde_json::to_vec(cursor)?;
907 store.set(source_id, CURSOR_STATE_KEY, bytes).await?;
908 Ok(())
909}
910
911async fn wait_for_poll_or_shutdown(
912 shutdown_rx: &mut watch::Receiver<bool>,
913 poll_interval_ms: u64,
914) -> bool {
915 tokio::select! {
916 _ = tokio::time::sleep(Duration::from_millis(poll_interval_ms)) => false,
917 changed = shutdown_rx.changed() => {
918 changed.is_ok() && *shutdown_rx.borrow()
919 }
920 }
921}
922
923pub struct SuiDeepBookSourceBuilder {
924 id: String,
925 config: SuiDeepBookSourceConfig,
926 dispatch_mode: Option<DispatchMode>,
927 dispatch_buffer_capacity: Option<usize>,
928 bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
929 state_store: Option<Arc<dyn StateStoreProvider>>,
930 auto_start: bool,
931}
932
933impl SuiDeepBookSourceBuilder {
934 pub fn new(id: impl Into<String>) -> Self {
935 Self {
936 id: id.into(),
937 config: SuiDeepBookSourceConfig::default(),
938 dispatch_mode: None,
939 dispatch_buffer_capacity: None,
940 bootstrap_provider: None,
941 state_store: None,
942 auto_start: true,
943 }
944 }
945
946 pub fn with_rpc_endpoint(mut self, rpc_endpoint: impl Into<String>) -> Self {
947 self.config.rpc_endpoint = rpc_endpoint.into();
948 self
949 }
950
951 pub fn with_grpc_endpoint(mut self, grpc_endpoint: impl Into<String>) -> Self {
952 self.config.grpc_endpoint = Some(grpc_endpoint.into());
953 self
954 }
955
956 pub fn with_transport(mut self, transport: Transport) -> Self {
957 self.config.transport = transport;
958 self
959 }
960
961 pub fn with_deepbook_package_id(mut self, package_id: impl Into<String>) -> Self {
962 self.config.deepbook_package_id = package_id.into();
963 self
964 }
965
966 pub fn with_poll_interval_ms(mut self, poll_interval_ms: u64) -> Self {
967 self.config.poll_interval_ms = poll_interval_ms;
968 self
969 }
970
971 pub fn with_request_limit(mut self, request_limit: u16) -> Self {
972 self.config.request_limit = request_limit;
973 self
974 }
975
976 pub fn with_event_filters(mut self, event_filters: Vec<String>) -> Self {
977 self.config.event_filters = event_filters;
978 self
979 }
980
981 pub fn with_pools(mut self, pools: Vec<String>) -> Self {
982 self.config.pools = pools;
983 self
984 }
985
986 pub fn with_start_position(mut self, start_position: StartPosition) -> Self {
987 self.config.start_position = start_position;
988 self
989 }
990
991 pub fn with_start_from_beginning(mut self) -> Self {
992 self.config.start_position = StartPosition::Beginning;
993 self
994 }
995
996 pub fn with_start_from_now(mut self) -> Self {
997 self.config.start_position = StartPosition::Now;
998 self
999 }
1000
1001 pub fn with_start_from_timestamp(mut self, timestamp_ms: i64) -> Self {
1002 self.config.start_position = StartPosition::Timestamp(timestamp_ms);
1003 self
1004 }
1005
1006 pub fn with_lookback_events(mut self, count: u16) -> Self {
1010 self.config.lookback_events = count;
1011 self
1012 }
1013
1014 pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1015 self.dispatch_mode = Some(mode);
1016 self
1017 }
1018
1019 pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1020 self.dispatch_buffer_capacity = Some(capacity);
1021 self
1022 }
1023
1024 pub fn with_bootstrap_provider(
1025 mut self,
1026 provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1027 ) -> Self {
1028 self.bootstrap_provider = Some(Box::new(provider));
1029 self
1030 }
1031
1032 pub fn with_state_store(mut self, state_store: Arc<dyn StateStoreProvider>) -> Self {
1033 self.state_store = Some(state_store);
1034 self
1035 }
1036
1037 pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1038 self.auto_start = auto_start;
1039 self
1040 }
1041
1042 pub fn with_config(mut self, config: SuiDeepBookSourceConfig) -> Self {
1043 self.config = config;
1044 self
1045 }
1046
1047 pub fn with_enable_pool_nodes(mut self, enable: bool) -> Self {
1048 self.config.enable_pool_nodes = enable;
1049 self
1050 }
1051
1052 pub fn with_enable_trader_nodes(mut self, enable: bool) -> Self {
1053 self.config.enable_trader_nodes = enable;
1054 self
1055 }
1056
1057 pub fn with_enable_order_nodes(mut self, enable: bool) -> Self {
1058 self.config.enable_order_nodes = enable;
1059 self
1060 }
1061
1062 pub fn build(self) -> Result<SuiDeepBookSource> {
1063 self.config.validate()?;
1064
1065 let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1066 if let Some(mode) = self.dispatch_mode {
1067 params = params.with_dispatch_mode(mode);
1068 }
1069 if let Some(capacity) = self.dispatch_buffer_capacity {
1070 params = params.with_dispatch_buffer_capacity(capacity);
1071 }
1072 if let Some(provider) = self.bootstrap_provider {
1073 params = params.with_bootstrap_provider(provider);
1074 }
1075
1076 let (shutdown_tx, shutdown_rx) = watch::channel(false);
1077
1078 Ok(SuiDeepBookSource {
1079 base: SourceBase::new(params)?,
1080 config: self.config,
1081 state_store: Arc::new(RwLock::new(self.state_store)),
1082 task_handle: Arc::new(RwLock::new(None)),
1083 shutdown_tx,
1084 shutdown_rx,
1085 })
1086 }
1087}
1088
1089#[cfg(test)]
1090mod tests {
1091 use super::*;
1092
1093 #[test]
1094 fn test_builder_defaults() {
1095 let source = SuiDeepBookSource::builder("source-1").build().unwrap();
1096 assert_eq!(source.id(), "source-1");
1097 assert_eq!(source.type_name(), "sui-deepbook");
1098 assert_eq!(source.config.poll_interval_ms, 2_000);
1099 }
1100
1101 #[test]
1102 fn test_builder_with_custom_values() {
1103 let source = SuiDeepBookSource::builder("source-1")
1104 .with_rpc_endpoint("https://fullnode.testnet.sui.io:443")
1105 .with_deepbook_package_id("0xabc")
1106 .with_poll_interval_ms(500)
1107 .with_request_limit(25)
1108 .with_start_from_beginning()
1109 .build()
1110 .unwrap();
1111
1112 assert_eq!(
1113 source.config.rpc_endpoint,
1114 "https://fullnode.testnet.sui.io:443"
1115 );
1116 assert_eq!(source.config.deepbook_package_id, "0xabc");
1117 assert_eq!(source.config.poll_interval_ms, 500);
1118 assert_eq!(source.config.request_limit, 25);
1119 assert!(matches!(
1120 source.config.start_position,
1121 StartPosition::Beginning
1122 ));
1123 }
1124
1125 #[test]
1126 fn test_builder_rejects_invalid_config() {
1127 let result = SuiDeepBookSource::builder("source-1")
1128 .with_rpc_endpoint("")
1129 .build();
1130 assert!(result.is_err());
1131 }
1132}
1133
1134#[cfg(feature = "dynamic-plugin")]
1135drasi_plugin_sdk::export_plugin!(
1136 plugin_id = "sui-deepbook-source",
1137 core_version = env!("CARGO_PKG_VERSION"),
1138 lib_version = env!("CARGO_PKG_VERSION"),
1139 plugin_version = env!("CARGO_PKG_VERSION"),
1140 source_descriptors = [descriptor::SuiDeepBookSourceDescriptor],
1141 reaction_descriptors = [],
1142 bootstrap_descriptors = [],
1143);