Skip to main content

drasi_source_sui_deepbook/
lib.rs

1#![allow(unexpected_cfgs)]
2// Copyright 2026 The Drasi Authors.
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8//     http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! Sui DeepBook source plugin for Drasi.
17//!
18//! This source monitors the Sui blockchain for DeepBook package events using either
19//! gRPC checkpoint streaming (recommended) or JSON-RPC polling (legacy fallback),
20//! and emits them as Drasi source changes.
21
22pub mod config;
23pub mod descriptor;
24pub mod grpc;
25pub mod mapping;
26pub mod rpc;
27
28pub use config::{
29    StartPosition, SuiDeepBookSourceConfig, Transport, DEFAULT_DEEPBOOK_PACKAGE_ID,
30    DEFAULT_SUI_MAINNET_GRPC, DEFAULT_SUI_MAINNET_RPC,
31};
32
33use anyhow::Result;
34use async_trait::async_trait;
35use drasi_lib::channels::{ComponentStatus, DispatchMode, SubscriptionResponse};
36use drasi_lib::sources::base::{SourceBase, SourceBaseParams};
37use drasi_lib::sources::Source;
38use drasi_lib::state_store::StateStoreProvider;
39use log::{debug, error, info, warn};
40use std::collections::{HashMap, HashSet};
41use std::sync::Arc;
42use std::time::Duration;
43use tokio::sync::watch;
44use tokio::sync::RwLock;
45
46use crate::mapping::{
47    build_order_node, build_pool_node, build_relationship, build_trader_node, event_order_id,
48    event_pool_id, map_event_to_change, should_include_event, EnrichmentConfig,
49};
50use crate::rpc::{EventCursor, SuiRpcClient};
51
52const CURSOR_STATE_KEY: &str = "cursor";
53const GRPC_CURSOR_STATE_KEY: &str = "grpc_checkpoint_seq";
54
55pub struct SuiDeepBookSource {
56    base: SourceBase,
57    config: SuiDeepBookSourceConfig,
58    state_store: Arc<RwLock<Option<Arc<dyn StateStoreProvider>>>>,
59    task_handle: Arc<RwLock<Option<tokio::task::JoinHandle<()>>>>,
60    shutdown_tx: watch::Sender<bool>,
61    shutdown_rx: watch::Receiver<bool>,
62}
63
64impl SuiDeepBookSource {
65    pub fn new(id: impl Into<String>, config: SuiDeepBookSourceConfig) -> Result<Self> {
66        config.validate()?;
67        let id = id.into();
68        let params = SourceBaseParams::new(&id);
69        let (shutdown_tx, shutdown_rx) = watch::channel(false);
70
71        Ok(Self {
72            base: SourceBase::new(params)?,
73            config,
74            state_store: Arc::new(RwLock::new(None)),
75            task_handle: Arc::new(RwLock::new(None)),
76            shutdown_tx,
77            shutdown_rx,
78        })
79    }
80
81    pub fn builder(id: impl Into<String>) -> SuiDeepBookSourceBuilder {
82        SuiDeepBookSourceBuilder::new(id)
83    }
84}
85
86#[async_trait]
87impl Source for SuiDeepBookSource {
88    fn id(&self) -> &str {
89        &self.base.id
90    }
91
92    fn type_name(&self) -> &str {
93        "sui-deepbook"
94    }
95
96    fn properties(&self) -> HashMap<String, serde_json::Value> {
97        let mut props = HashMap::new();
98        props.insert(
99            "rpc_endpoint".to_string(),
100            serde_json::Value::String(self.config.rpc_endpoint.clone()),
101        );
102        props.insert(
103            "deepbook_package_id".to_string(),
104            serde_json::Value::String(self.config.deepbook_package_id.clone()),
105        );
106        props.insert(
107            "poll_interval_ms".to_string(),
108            serde_json::Value::Number(self.config.poll_interval_ms.into()),
109        );
110        props.insert(
111            "request_limit".to_string(),
112            serde_json::Value::Number(self.config.request_limit.into()),
113        );
114        props.insert(
115            "event_filters".to_string(),
116            serde_json::Value::Array(
117                self.config
118                    .event_filters
119                    .iter()
120                    .cloned()
121                    .map(serde_json::Value::String)
122                    .collect(),
123            ),
124        );
125        props.insert(
126            "pools".to_string(),
127            serde_json::Value::Array(
128                self.config
129                    .pools
130                    .iter()
131                    .cloned()
132                    .map(serde_json::Value::String)
133                    .collect(),
134            ),
135        );
136        props.insert(
137            "start_position".to_string(),
138            serde_json::to_value(self.config.start_position).unwrap_or(serde_json::Value::Null),
139        );
140        props
141    }
142
143    fn auto_start(&self) -> bool {
144        self.base.get_auto_start()
145    }
146
147    async fn status(&self) -> ComponentStatus {
148        self.base.get_status().await
149    }
150
151    async fn start(&self) -> Result<()> {
152        if self.base.get_status().await == ComponentStatus::Running {
153            return Ok(());
154        }
155
156        self.base.set_status(ComponentStatus::Starting, None).await;
157        info!("Starting Sui DeepBook source '{}'", self.base.id);
158
159        let source_id = self.base.id.clone();
160        let config = self.config.clone();
161        let base = self.base.clone_shared();
162        let state_store = self.state_store.read().await.clone();
163        let mut shutdown_rx = self.shutdown_rx.clone();
164
165        let task_handle = tokio::spawn(async move {
166            let result = match config.transport {
167                Transport::Grpc => {
168                    run_grpc_stream(&source_id, config, &base, state_store, &mut shutdown_rx).await
169                }
170                Transport::JsonRpc => {
171                    run_poll_loop(&source_id, config, &base, state_store, &mut shutdown_rx).await
172                }
173            };
174            if let Err(err) = result {
175                error!("Sui DeepBook task failed for '{source_id}': {err}");
176                base.set_status(ComponentStatus::Error, Some(err.to_string()))
177                    .await;
178            }
179        });
180
181        *self.task_handle.write().await = Some(task_handle);
182        self.base.set_status(ComponentStatus::Running, None).await;
183        Ok(())
184    }
185
186    async fn stop(&self) -> Result<()> {
187        info!("Stopping Sui DeepBook source '{}'", self.base.id);
188        self.base.set_status(ComponentStatus::Stopping, None).await;
189
190        if let Err(err) = self.shutdown_tx.send(true) {
191            warn!(
192                "Failed sending shutdown signal for Sui DeepBook source '{}': {err}",
193                self.base.id
194            );
195        }
196
197        if let Some(handle) = self.task_handle.write().await.take() {
198            match tokio::time::timeout(Duration::from_secs(5), handle).await {
199                Ok(Ok(())) => debug!("Sui DeepBook polling task stopped gracefully"),
200                Ok(Err(err)) => warn!("Sui DeepBook polling task panicked: {err}"),
201                Err(_) => warn!("Sui DeepBook polling task did not stop within timeout"),
202            }
203        }
204
205        self.base.set_status(ComponentStatus::Stopped, None).await;
206        Ok(())
207    }
208
209    async fn subscribe(
210        &self,
211        settings: drasi_lib::config::SourceSubscriptionSettings,
212    ) -> Result<SubscriptionResponse> {
213        self.base
214            .subscribe_with_bootstrap(&settings, "Sui DeepBook")
215            .await
216    }
217
218    fn as_any(&self) -> &dyn std::any::Any {
219        self
220    }
221
222    async fn initialize(&self, context: drasi_lib::context::SourceRuntimeContext) {
223        self.base.initialize(context.clone()).await;
224        if let Some(state_store) = context.state_store {
225            *self.state_store.write().await = Some(state_store);
226            debug!(
227                "State store injected into Sui DeepBook source '{}'",
228                self.base.id
229            );
230        }
231    }
232
233    async fn set_bootstrap_provider(
234        &self,
235        provider: Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>,
236    ) {
237        self.base.set_bootstrap_provider(provider).await;
238    }
239}
240
241// ── gRPC Checkpoint Streaming ──
242
243async fn run_grpc_stream(
244    source_id: &str,
245    config: SuiDeepBookSourceConfig,
246    base: &SourceBase,
247    state_store: Option<Arc<dyn StateStoreProvider>>,
248    shutdown_rx: &mut watch::Receiver<bool>,
249) -> Result<()> {
250    let grpc_endpoint = config.effective_grpc_endpoint().to_owned();
251    info!("Starting gRPC checkpoint stream for '{source_id}' from {grpc_endpoint}");
252
253    // We still need the JSON-RPC client for pool metadata enrichment (get_object)
254    let rpc_client = SuiRpcClient::new(config.rpc_endpoint.clone())?;
255
256    let enrichment = EnrichmentConfig {
257        enable_pool_nodes: config.enable_pool_nodes,
258        enable_trader_nodes: config.enable_trader_nodes,
259        enable_order_nodes: config.enable_order_nodes,
260    };
261    let mut enrichment_state = EnrichmentState {
262        seen_pools: HashSet::new(),
263        seen_traders: HashSet::new(),
264        seen_orders: HashSet::new(),
265    };
266
267    let mut last_checkpoint_seq = load_grpc_cursor(source_id, &state_store).await?;
268    if let Some(seq) = last_checkpoint_seq {
269        info!("Resuming gRPC stream from checkpoint sequence {seq}");
270    }
271
272    let mut consecutive_failures = 0u32;
273    let max_retries = 20u32;
274
275    loop {
276        if *shutdown_rx.borrow() {
277            info!("Received shutdown signal for gRPC stream '{source_id}'");
278            break;
279        }
280
281        // Connect / reconnect (connect_lazy — doesn't actually connect yet)
282        let mut grpc_client = match crate::grpc::SuiGrpcClient::new(&grpc_endpoint) {
283            Ok(c) => c,
284            Err(err) => {
285                consecutive_failures += 1;
286                error!("Failed to create gRPC client (attempt {consecutive_failures}): {err}");
287                if consecutive_failures >= max_retries {
288                    return Err(err);
289                }
290                let backoff_ms = backoff_delay_ms(consecutive_failures);
291                if wait_for_poll_or_shutdown(shutdown_rx, backoff_ms).await {
292                    break;
293                }
294                continue;
295            }
296        };
297
298        let mut stream = match grpc_client.subscribe_checkpoints().await {
299            Ok(s) => {
300                info!("gRPC checkpoint stream established for '{source_id}'");
301                s
302            }
303            Err(err) => {
304                consecutive_failures += 1;
305                error!(
306                    "Failed to subscribe to checkpoints (attempt {consecutive_failures}): {err}"
307                );
308                if consecutive_failures >= max_retries {
309                    return Err(err);
310                }
311                let backoff_ms = backoff_delay_ms(consecutive_failures);
312                if wait_for_poll_or_shutdown(shutdown_rx, backoff_ms).await {
313                    break;
314                }
315                continue;
316            }
317        };
318
319        // Stream processing loop
320        loop {
321            if *shutdown_rx.borrow() {
322                break;
323            }
324
325            let msg = tokio::select! {
326                msg = stream.message() => msg,
327                _ = shutdown_rx.changed() => {
328                    break;
329                }
330            };
331
332            match msg {
333                Ok(Some(response)) => {
334                    consecutive_failures = 0;
335                    let seq = response.cursor.unwrap_or(0);
336
337                    // Skip checkpoints we've already processed
338                    if let Some(last_seq) = last_checkpoint_seq {
339                        if seq <= last_seq {
340                            continue;
341                        }
342                    }
343
344                    if let Some(checkpoint) = response.checkpoint {
345                        let events = crate::grpc::extract_deepbook_events(
346                            &checkpoint,
347                            &config.deepbook_package_id,
348                        );
349
350                        if !events.is_empty() {
351                            info!(
352                                "Checkpoint {seq}: extracted {} DeepBook event(s)",
353                                events.len()
354                            );
355                        }
356
357                        for event in &events {
358                            if !should_include_event(event, &config.event_filters, &config.pools) {
359                                debug!("Skipping event (filtered): {}", event.event_type);
360                                continue;
361                            }
362
363                            let effective_from = event.timestamp_ms.unwrap_or_else(|| {
364                                chrono::Utc::now().timestamp_millis().max(0) as u64
365                            });
366
367                            emit_enrichment_nodes(
368                                source_id,
369                                event,
370                                &enrichment,
371                                &rpc_client,
372                                &mut enrichment_state,
373                                effective_from,
374                                base,
375                            )
376                            .await?;
377
378                            let event_entity_id = crate::mapping::derive_entity_id_pub(event);
379                            let change = map_event_to_change(source_id, event);
380                            debug!(
381                                "Dispatching gRPC event: entity_id={event_entity_id}, type={}",
382                                event.event_type
383                            );
384                            base.dispatch_source_change(change).await?;
385
386                            emit_enrichment_relationships(
387                                source_id,
388                                event,
389                                &enrichment,
390                                &event_entity_id,
391                                effective_from,
392                                base,
393                            )
394                            .await?;
395                        }
396                    } else {
397                        debug!("Checkpoint {seq}: no checkpoint data in response");
398                    }
399
400                    save_grpc_cursor(source_id, seq, &state_store).await?;
401                    last_checkpoint_seq = Some(seq);
402                }
403                Ok(None) => {
404                    warn!("gRPC checkpoint stream ended for '{source_id}', reconnecting...");
405                    break;
406                }
407                Err(err) => {
408                    consecutive_failures += 1;
409                    error!("gRPC stream error (attempt {consecutive_failures}): {err}");
410                    break;
411                }
412            }
413        }
414
415        if *shutdown_rx.borrow() {
416            break;
417        }
418
419        let backoff_ms = backoff_delay_ms(consecutive_failures);
420        warn!("Reconnecting gRPC stream in {backoff_ms}ms...");
421        if wait_for_poll_or_shutdown(shutdown_rx, backoff_ms).await {
422            break;
423        }
424    }
425
426    Ok(())
427}
428
429fn backoff_delay_ms(failures: u32) -> u64 {
430    let base_ms = 1000u64;
431    let max_ms = 30_000u64;
432    let delay = base_ms * 2u64.saturating_pow(failures.min(15));
433    delay.min(max_ms)
434}
435
436async fn load_grpc_cursor(
437    source_id: &str,
438    state_store: &Option<Arc<dyn StateStoreProvider>>,
439) -> Result<Option<u64>> {
440    let Some(store) = state_store else {
441        return Ok(None);
442    };
443
444    let Some(bytes) = store.get(source_id, GRPC_CURSOR_STATE_KEY).await? else {
445        return Ok(None);
446    };
447
448    match serde_json::from_slice::<u64>(&bytes) {
449        Ok(seq) => Ok(Some(seq)),
450        Err(err) => {
451            warn!("Failed to parse gRPC cursor for source '{source_id}': {err}. Clearing state.");
452            let _ = store.delete(source_id, GRPC_CURSOR_STATE_KEY).await;
453            Ok(None)
454        }
455    }
456}
457
458async fn save_grpc_cursor(
459    source_id: &str,
460    seq: u64,
461    state_store: &Option<Arc<dyn StateStoreProvider>>,
462) -> Result<()> {
463    let Some(store) = state_store else {
464        return Ok(());
465    };
466
467    let bytes = serde_json::to_vec(&seq)?;
468    store.set(source_id, GRPC_CURSOR_STATE_KEY, bytes).await?;
469    Ok(())
470}
471
472// ── JSON-RPC Polling (Legacy) ──
473
474async fn run_poll_loop(
475    source_id: &str,
476    config: SuiDeepBookSourceConfig,
477    base: &SourceBase,
478    state_store: Option<Arc<dyn StateStoreProvider>>,
479    shutdown_rx: &mut watch::Receiver<bool>,
480) -> Result<()> {
481    let rpc_client = SuiRpcClient::new(config.rpc_endpoint.clone())?;
482    // Use MoveModule filter to only fetch events from the DeepBook package.
483    // This is orders of magnitude faster than {"All":[]} on Sui mainnet.
484    let query_filter = serde_json::json!({
485        "MoveModule": {
486            "package": config.deepbook_package_id,
487            "module": "pool"
488        }
489    });
490
491    let mut cursor = load_cursor(source_id, &state_store).await?;
492
493    if cursor.is_none() && matches!(config.start_position, StartPosition::Now) {
494        cursor =
495            initialize_cursor_for_now(&rpc_client, &query_filter, config.request_limit).await?;
496        if let Some(cursor_ref) = cursor.as_ref() {
497            save_cursor(source_id, cursor_ref, &state_store).await?;
498        }
499    }
500
501    let timestamp_floor = match config.start_position {
502        StartPosition::Timestamp(value) => Some(value.max(0) as u64),
503        _ => None,
504    };
505
506    let enrichment = EnrichmentConfig {
507        enable_pool_nodes: config.enable_pool_nodes,
508        enable_trader_nodes: config.enable_trader_nodes,
509        enable_order_nodes: config.enable_order_nodes,
510    };
511    let mut enrichment_state = EnrichmentState {
512        seen_pools: HashSet::new(),
513        seen_traders: HashSet::new(),
514        seen_orders: HashSet::new(),
515    };
516
517    // ── Lookback phase ──
518    // Fetch recent historical events in descending order so the dashboard
519    // has data immediately on startup.
520    if config.lookback_events > 0 && cursor.is_some() {
521        info!(
522            "Fetching up to {} recent events for lookback",
523            config.lookback_events
524        );
525        match fetch_lookback_events(
526            source_id,
527            &config,
528            &rpc_client,
529            &query_filter,
530            &enrichment,
531            &mut enrichment_state,
532            base,
533        )
534        .await
535        {
536            Ok(count) => info!("Lookback phase complete: processed {count} events"),
537            Err(err) => warn!("Lookback phase failed (non-fatal, continuing): {err}"),
538        }
539    }
540
541    let mut consecutive_failures = 0usize;
542    loop {
543        if *shutdown_rx.borrow() {
544            info!("Received shutdown signal for Sui DeepBook source '{source_id}'");
545            break;
546        }
547
548        let query_result = match rpc_client
549            .query_events(
550                query_filter.clone(),
551                cursor.as_ref(),
552                config.request_limit,
553                false,
554            )
555            .await
556        {
557            Ok(result) => {
558                consecutive_failures = 0;
559                result
560            }
561            Err(err) => {
562                consecutive_failures += 1;
563                error!("Failed querying DeepBook events (attempt {consecutive_failures}): {err}");
564                if consecutive_failures >= 10 {
565                    return Err(err);
566                }
567                if wait_for_poll_or_shutdown(shutdown_rx, config.poll_interval_ms).await {
568                    break;
569                }
570                continue;
571            }
572        };
573
574        let mut latest_cursor = cursor.clone();
575        for event in query_result.data {
576            latest_cursor = Some(event.id.clone());
577
578            if event.package_id != config.deepbook_package_id {
579                continue;
580            }
581
582            if let Some(ts_floor) = timestamp_floor {
583                if event
584                    .timestamp_ms
585                    .is_some_and(|timestamp| timestamp < ts_floor)
586                {
587                    continue;
588                }
589            }
590
591            if !should_include_event(&event, &config.event_filters, &config.pools) {
592                continue;
593            }
594
595            let effective_from = event
596                .timestamp_ms
597                .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().max(0) as u64);
598
599            // Emit enrichment nodes before the event node
600            emit_enrichment_nodes(
601                source_id,
602                &event,
603                &enrichment,
604                &rpc_client,
605                &mut enrichment_state,
606                effective_from,
607                base,
608            )
609            .await?;
610
611            // Emit the event node itself
612            let event_entity_id = crate::mapping::derive_entity_id_pub(&event);
613            let change = map_event_to_change(source_id, &event);
614            base.dispatch_source_change(change).await?;
615
616            // Emit relationships after both nodes exist
617            emit_enrichment_relationships(
618                source_id,
619                &event,
620                &enrichment,
621                &event_entity_id,
622                effective_from,
623                base,
624            )
625            .await?;
626        }
627
628        // Always advance cursor from next_cursor when present, even if all
629        // events on this page were filtered out.  Without this, the source
630        // would re-fetch the same page forever when no events match.
631        if let Some(next_cursor) = query_result.next_cursor {
632            latest_cursor = Some(next_cursor);
633        }
634
635        if latest_cursor != cursor {
636            cursor = latest_cursor;
637            if let Some(cursor_ref) = cursor.as_ref() {
638                save_cursor(source_id, cursor_ref, &state_store).await?;
639            }
640        }
641
642        if query_result.has_next_page {
643            continue;
644        }
645
646        if wait_for_poll_or_shutdown(shutdown_rx, config.poll_interval_ms).await {
647            break;
648        }
649    }
650
651    Ok(())
652}
653
654/// Mutable state for the enrichment caches within a polling session.
655struct EnrichmentState {
656    seen_pools: HashSet<String>,
657    seen_traders: HashSet<String>,
658    seen_orders: HashSet<String>,
659}
660
661/// Emit Pool, Trader, and Order nodes if not already seen.
662async fn emit_enrichment_nodes(
663    source_id: &str,
664    event: &crate::rpc::SuiEvent,
665    enrichment: &EnrichmentConfig,
666    rpc_client: &SuiRpcClient,
667    state: &mut EnrichmentState,
668    effective_from: u64,
669    base: &SourceBase,
670) -> Result<()> {
671    // Pool node
672    if enrichment.enable_pool_nodes {
673        if let Some(pool_id) = event_pool_id(event) {
674            if state.seen_pools.insert(pool_id.clone()) {
675                let object_data = match rpc_client.get_object(&pool_id).await {
676                    Ok(data) => {
677                        debug!("Fetched pool metadata for {pool_id}");
678                        Some(data)
679                    }
680                    Err(err) => {
681                        warn!("Failed to fetch pool object for {pool_id}: {err}");
682                        None
683                    }
684                };
685                let pool_change =
686                    build_pool_node(source_id, &pool_id, object_data.as_ref(), effective_from);
687                base.dispatch_source_change(pool_change).await?;
688            }
689        }
690    }
691
692    // Trader node
693    if enrichment.enable_trader_nodes
694        && !event.sender.is_empty()
695        && state.seen_traders.insert(event.sender.clone())
696    {
697        let trader_change = build_trader_node(source_id, &event.sender, effective_from);
698        base.dispatch_source_change(trader_change).await?;
699    }
700
701    // Order node
702    if enrichment.enable_order_nodes {
703        if let Some(order_id) = event_order_id(event) {
704            if state.seen_orders.insert(order_id.clone()) {
705                let pool_id = event_pool_id(event);
706                let order_change =
707                    build_order_node(source_id, &order_id, pool_id.as_deref(), effective_from);
708                base.dispatch_source_change(order_change).await?;
709            }
710        }
711    }
712
713    Ok(())
714}
715
716/// Emit IN_POOL, SENT_BY, FOR_ORDER relationships after both nodes exist.
717async fn emit_enrichment_relationships(
718    source_id: &str,
719    event: &crate::rpc::SuiEvent,
720    enrichment: &EnrichmentConfig,
721    event_entity_id: &str,
722    effective_from: u64,
723    base: &SourceBase,
724) -> Result<()> {
725    // IN_POOL relationship
726    if enrichment.enable_pool_nodes {
727        if let Some(pool_id) = event_pool_id(event) {
728            let rel = build_relationship(
729                source_id,
730                "IN_POOL",
731                &format!("rel:in_pool:{event_entity_id}"),
732                event_entity_id,
733                &format!("pool_meta:{pool_id}"),
734                effective_from,
735            );
736            base.dispatch_source_change(rel).await?;
737        }
738    }
739
740    // SENT_BY relationship
741    if enrichment.enable_trader_nodes && !event.sender.is_empty() {
742        let rel = build_relationship(
743            source_id,
744            "SENT_BY",
745            &format!("rel:sent_by:{event_entity_id}"),
746            event_entity_id,
747            &format!("trader:{}", event.sender),
748            effective_from,
749        );
750        base.dispatch_source_change(rel).await?;
751    }
752
753    // FOR_ORDER relationship
754    if enrichment.enable_order_nodes {
755        if let Some(order_id) = event_order_id(event) {
756            let rel = build_relationship(
757                source_id,
758                "FOR_ORDER",
759                &format!("rel:for_order:{event_entity_id}"),
760                event_entity_id,
761                &format!("order_meta:{order_id}"),
762                effective_from,
763            );
764            base.dispatch_source_change(rel).await?;
765        }
766    }
767
768    Ok(())
769}
770
771async fn initialize_cursor_for_now(
772    rpc_client: &SuiRpcClient,
773    query_filter: &serde_json::Value,
774    request_limit: u16,
775) -> Result<Option<EventCursor>> {
776    let result = rpc_client
777        .query_events(query_filter.clone(), None, request_limit.min(10), true)
778        .await?;
779
780    if let Some(first) = result.data.first() {
781        return Ok(Some(first.id.clone()));
782    }
783    Ok(result.next_cursor)
784}
785
786/// Fetch the most recent `lookback_events` events in descending order,
787/// reverse them into chronological order, and process them as inserts.
788/// This lets the dashboard display data immediately on startup.
789async fn fetch_lookback_events(
790    source_id: &str,
791    config: &SuiDeepBookSourceConfig,
792    rpc_client: &SuiRpcClient,
793    query_filter: &serde_json::Value,
794    enrichment: &EnrichmentConfig,
795    enrichment_state: &mut EnrichmentState,
796    base: &SourceBase,
797) -> Result<usize> {
798    let mut all_events = Vec::new();
799    let mut page_cursor: Option<EventCursor> = None;
800    let remaining = config.lookback_events as usize;
801    let page_size = config.request_limit.min(50);
802
803    // Fetch pages in descending (most recent first) order
804    loop {
805        let result = rpc_client
806            .query_events(
807                query_filter.clone(),
808                page_cursor.as_ref(),
809                page_size,
810                true, // descending
811            )
812            .await?;
813
814        for event in result.data {
815            if event.package_id != config.deepbook_package_id {
816                continue;
817            }
818            if !should_include_event(&event, &config.event_filters, &config.pools) {
819                continue;
820            }
821            all_events.push(event);
822            if all_events.len() >= remaining {
823                break;
824            }
825        }
826
827        if all_events.len() >= remaining || !result.has_next_page {
828            break;
829        }
830        page_cursor = result.next_cursor;
831    }
832
833    // Reverse to chronological order (oldest first)
834    all_events.reverse();
835
836    let count = all_events.len();
837    debug!("Lookback: processing {count} events in chronological order");
838
839    for event in &all_events {
840        let effective_from = event
841            .timestamp_ms
842            .unwrap_or_else(|| chrono::Utc::now().timestamp_millis().max(0) as u64);
843
844        emit_enrichment_nodes(
845            source_id,
846            event,
847            enrichment,
848            rpc_client,
849            enrichment_state,
850            effective_from,
851            base,
852        )
853        .await?;
854
855        let event_entity_id = crate::mapping::derive_entity_id_pub(event);
856        let change = map_event_to_change(source_id, event);
857        base.dispatch_source_change(change).await?;
858
859        emit_enrichment_relationships(
860            source_id,
861            event,
862            enrichment,
863            &event_entity_id,
864            effective_from,
865            base,
866        )
867        .await?;
868    }
869
870    Ok(count)
871}
872
873async fn load_cursor(
874    source_id: &str,
875    state_store: &Option<Arc<dyn StateStoreProvider>>,
876) -> Result<Option<EventCursor>> {
877    let Some(store) = state_store else {
878        return Ok(None);
879    };
880
881    let Some(bytes) = store.get(source_id, CURSOR_STATE_KEY).await? else {
882        return Ok(None);
883    };
884
885    match serde_json::from_slice::<EventCursor>(&bytes) {
886        Ok(cursor) => Ok(Some(cursor)),
887        Err(err) => {
888            warn!(
889                "Failed to parse persisted DeepBook cursor for source '{source_id}': {err}. Clearing state."
890            );
891            let _ = store.delete(source_id, CURSOR_STATE_KEY).await;
892            Ok(None)
893        }
894    }
895}
896
897async fn save_cursor(
898    source_id: &str,
899    cursor: &EventCursor,
900    state_store: &Option<Arc<dyn StateStoreProvider>>,
901) -> Result<()> {
902    let Some(store) = state_store else {
903        return Ok(());
904    };
905
906    let bytes = serde_json::to_vec(cursor)?;
907    store.set(source_id, CURSOR_STATE_KEY, bytes).await?;
908    Ok(())
909}
910
911async fn wait_for_poll_or_shutdown(
912    shutdown_rx: &mut watch::Receiver<bool>,
913    poll_interval_ms: u64,
914) -> bool {
915    tokio::select! {
916        _ = tokio::time::sleep(Duration::from_millis(poll_interval_ms)) => false,
917        changed = shutdown_rx.changed() => {
918            changed.is_ok() && *shutdown_rx.borrow()
919        }
920    }
921}
922
923pub struct SuiDeepBookSourceBuilder {
924    id: String,
925    config: SuiDeepBookSourceConfig,
926    dispatch_mode: Option<DispatchMode>,
927    dispatch_buffer_capacity: Option<usize>,
928    bootstrap_provider: Option<Box<dyn drasi_lib::bootstrap::BootstrapProvider + 'static>>,
929    state_store: Option<Arc<dyn StateStoreProvider>>,
930    auto_start: bool,
931}
932
933impl SuiDeepBookSourceBuilder {
934    pub fn new(id: impl Into<String>) -> Self {
935        Self {
936            id: id.into(),
937            config: SuiDeepBookSourceConfig::default(),
938            dispatch_mode: None,
939            dispatch_buffer_capacity: None,
940            bootstrap_provider: None,
941            state_store: None,
942            auto_start: true,
943        }
944    }
945
946    pub fn with_rpc_endpoint(mut self, rpc_endpoint: impl Into<String>) -> Self {
947        self.config.rpc_endpoint = rpc_endpoint.into();
948        self
949    }
950
951    pub fn with_grpc_endpoint(mut self, grpc_endpoint: impl Into<String>) -> Self {
952        self.config.grpc_endpoint = Some(grpc_endpoint.into());
953        self
954    }
955
956    pub fn with_transport(mut self, transport: Transport) -> Self {
957        self.config.transport = transport;
958        self
959    }
960
961    pub fn with_deepbook_package_id(mut self, package_id: impl Into<String>) -> Self {
962        self.config.deepbook_package_id = package_id.into();
963        self
964    }
965
966    pub fn with_poll_interval_ms(mut self, poll_interval_ms: u64) -> Self {
967        self.config.poll_interval_ms = poll_interval_ms;
968        self
969    }
970
971    pub fn with_request_limit(mut self, request_limit: u16) -> Self {
972        self.config.request_limit = request_limit;
973        self
974    }
975
976    pub fn with_event_filters(mut self, event_filters: Vec<String>) -> Self {
977        self.config.event_filters = event_filters;
978        self
979    }
980
981    pub fn with_pools(mut self, pools: Vec<String>) -> Self {
982        self.config.pools = pools;
983        self
984    }
985
986    pub fn with_start_position(mut self, start_position: StartPosition) -> Self {
987        self.config.start_position = start_position;
988        self
989    }
990
991    pub fn with_start_from_beginning(mut self) -> Self {
992        self.config.start_position = StartPosition::Beginning;
993        self
994    }
995
996    pub fn with_start_from_now(mut self) -> Self {
997        self.config.start_position = StartPosition::Now;
998        self
999    }
1000
1001    pub fn with_start_from_timestamp(mut self, timestamp_ms: i64) -> Self {
1002        self.config.start_position = StartPosition::Timestamp(timestamp_ms);
1003        self
1004    }
1005
1006    /// Fetch up to `count` recent historical events on startup before
1007    /// entering the forward-polling loop.  Useful for populating dashboards
1008    /// with recent data immediately.
1009    pub fn with_lookback_events(mut self, count: u16) -> Self {
1010        self.config.lookback_events = count;
1011        self
1012    }
1013
1014    pub fn with_dispatch_mode(mut self, mode: DispatchMode) -> Self {
1015        self.dispatch_mode = Some(mode);
1016        self
1017    }
1018
1019    pub fn with_dispatch_buffer_capacity(mut self, capacity: usize) -> Self {
1020        self.dispatch_buffer_capacity = Some(capacity);
1021        self
1022    }
1023
1024    pub fn with_bootstrap_provider(
1025        mut self,
1026        provider: impl drasi_lib::bootstrap::BootstrapProvider + 'static,
1027    ) -> Self {
1028        self.bootstrap_provider = Some(Box::new(provider));
1029        self
1030    }
1031
1032    pub fn with_state_store(mut self, state_store: Arc<dyn StateStoreProvider>) -> Self {
1033        self.state_store = Some(state_store);
1034        self
1035    }
1036
1037    pub fn with_auto_start(mut self, auto_start: bool) -> Self {
1038        self.auto_start = auto_start;
1039        self
1040    }
1041
1042    pub fn with_config(mut self, config: SuiDeepBookSourceConfig) -> Self {
1043        self.config = config;
1044        self
1045    }
1046
1047    pub fn with_enable_pool_nodes(mut self, enable: bool) -> Self {
1048        self.config.enable_pool_nodes = enable;
1049        self
1050    }
1051
1052    pub fn with_enable_trader_nodes(mut self, enable: bool) -> Self {
1053        self.config.enable_trader_nodes = enable;
1054        self
1055    }
1056
1057    pub fn with_enable_order_nodes(mut self, enable: bool) -> Self {
1058        self.config.enable_order_nodes = enable;
1059        self
1060    }
1061
1062    pub fn build(self) -> Result<SuiDeepBookSource> {
1063        self.config.validate()?;
1064
1065        let mut params = SourceBaseParams::new(&self.id).with_auto_start(self.auto_start);
1066        if let Some(mode) = self.dispatch_mode {
1067            params = params.with_dispatch_mode(mode);
1068        }
1069        if let Some(capacity) = self.dispatch_buffer_capacity {
1070            params = params.with_dispatch_buffer_capacity(capacity);
1071        }
1072        if let Some(provider) = self.bootstrap_provider {
1073            params = params.with_bootstrap_provider(provider);
1074        }
1075
1076        let (shutdown_tx, shutdown_rx) = watch::channel(false);
1077
1078        Ok(SuiDeepBookSource {
1079            base: SourceBase::new(params)?,
1080            config: self.config,
1081            state_store: Arc::new(RwLock::new(self.state_store)),
1082            task_handle: Arc::new(RwLock::new(None)),
1083            shutdown_tx,
1084            shutdown_rx,
1085        })
1086    }
1087}
1088
1089#[cfg(test)]
1090mod tests {
1091    use super::*;
1092
1093    #[test]
1094    fn test_builder_defaults() {
1095        let source = SuiDeepBookSource::builder("source-1").build().unwrap();
1096        assert_eq!(source.id(), "source-1");
1097        assert_eq!(source.type_name(), "sui-deepbook");
1098        assert_eq!(source.config.poll_interval_ms, 2_000);
1099    }
1100
1101    #[test]
1102    fn test_builder_with_custom_values() {
1103        let source = SuiDeepBookSource::builder("source-1")
1104            .with_rpc_endpoint("https://fullnode.testnet.sui.io:443")
1105            .with_deepbook_package_id("0xabc")
1106            .with_poll_interval_ms(500)
1107            .with_request_limit(25)
1108            .with_start_from_beginning()
1109            .build()
1110            .unwrap();
1111
1112        assert_eq!(
1113            source.config.rpc_endpoint,
1114            "https://fullnode.testnet.sui.io:443"
1115        );
1116        assert_eq!(source.config.deepbook_package_id, "0xabc");
1117        assert_eq!(source.config.poll_interval_ms, 500);
1118        assert_eq!(source.config.request_limit, 25);
1119        assert!(matches!(
1120            source.config.start_position,
1121            StartPosition::Beginning
1122        ));
1123    }
1124
1125    #[test]
1126    fn test_builder_rejects_invalid_config() {
1127        let result = SuiDeepBookSource::builder("source-1")
1128            .with_rpc_endpoint("")
1129            .build();
1130        assert!(result.is_err());
1131    }
1132}
1133
1134#[cfg(feature = "dynamic-plugin")]
1135drasi_plugin_sdk::export_plugin!(
1136    plugin_id = "sui-deepbook-source",
1137    core_version = env!("CARGO_PKG_VERSION"),
1138    lib_version = env!("CARGO_PKG_VERSION"),
1139    plugin_version = env!("CARGO_PKG_VERSION"),
1140    source_descriptors = [descriptor::SuiDeepBookSourceDescriptor],
1141    reaction_descriptors = [],
1142    bootstrap_descriptors = [],
1143);