nautilus-blockchain 0.55.0

Blockchain and DeFi integration adapter for the Nautilus trading engine
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
// -------------------------------------------------------------------------------------------------
//  Copyright (C) 2015-2026 Nautech Systems Pty Ltd. All rights reserved.
//  https://nautechsystems.io
//
//  Licensed under the GNU Lesser General Public License Version 3.0 (the "License");
//  You may not use this file except in compliance with the License.
//  You may obtain a copy of the License at https://www.gnu.org/licenses/lgpl-3.0.en.html
//
//  Unless required by applicable law or agreed to in writing, software
//  distributed under the License is distributed on an "AS IS" BASIS,
//  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
//  See the License for the specific language governing permissions and
//  limitations under the License.
// -------------------------------------------------------------------------------------------------

use std::{cmp::max, collections::HashSet};

use alloy::primitives::Address;
use futures_util::StreamExt;
use nautilus_core::formatting::Separable;
use nautilus_model::defi::{
    SharedDex,
    amm::Pool,
    chain::SharedChain,
    reporting::{BlockchainSyncReportItems, BlockchainSyncReporter},
    token::Token,
};
use tokio_util::sync::CancellationToken;

use crate::{
    cache::BlockchainCache,
    config::BlockchainDataClientConfig,
    contracts::erc20::Erc20Contract,
    events::pool_created::PoolCreatedEvent,
    exchanges::extended::DexExtended,
    hypersync::{client::HyperSyncClient, helpers::extract_block_number},
};

const BLOCKS_PROCESS_IN_SYNC_REPORT: u64 = 50_000;
const POOL_DB_BATCH_SIZE: usize = 2000;

/// Sanitizes a string by removing null bytes and other invalid characters for PostgreSQL UTF-8.
///
/// This function strips null bytes (0x00) and other problematic control characters that are
/// invalid in PostgreSQL's UTF-8 text fields. Common with malformed on-chain token metadata.
/// Preserves printable characters and common whitespace (space, tab, newline).
fn sanitize_string(s: &str) -> String {
    s.chars()
        .filter(|c| {
            // Keep printable characters and common whitespace, but filter null bytes
            // and other problematic control characters
            *c != '\0' && (*c >= ' ' || *c == '\t' || *c == '\n' || *c == '\r')
        })
        .collect()
}

/// Service responsible for discovering DEX liquidity pools from blockchain events.
///
/// This service handles the synchronization of pool creation events from various DEXes,
/// managing token metadata fetching, buffering strategies, and database persistence.
#[derive(Debug)]
pub struct PoolDiscoveryService<'a> {
    /// The blockchain network being synced
    chain: SharedChain,
    /// Cache for tokens and pools
    cache: &'a mut BlockchainCache,
    /// ERC20 contract interface for token metadata
    erc20_contract: &'a Erc20Contract,
    /// HyperSync client for event streaming
    hypersync_client: &'a HyperSyncClient,
    /// Cancellation token for graceful shutdown
    cancellation_token: CancellationToken,
    /// Configuration for sync operations
    config: BlockchainDataClientConfig,
}

impl<'a> PoolDiscoveryService<'a> {
    /// Creates a new [`PoolDiscoveryService`] instance.
    #[must_use]
    pub const fn new(
        chain: SharedChain,
        cache: &'a mut BlockchainCache,
        erc20_contract: &'a Erc20Contract,
        hypersync_client: &'a HyperSyncClient,
        cancellation_token: CancellationToken,
        config: BlockchainDataClientConfig,
    ) -> Self {
        Self {
            chain,
            cache,
            erc20_contract,
            hypersync_client,
            cancellation_token,
            config,
        }
    }

    /// Synchronizes pools for a specific DEX within a given block range.
    ///
    /// # Errors
    ///
    /// Returns an error if:
    /// - HyperSync streaming fails
    /// - Token RPC calls fail
    /// - Database operations fail
    /// - Sync is cancelled
    pub async fn sync_pools(
        &mut self,
        dex: &DexExtended,
        from_block: u64,
        to_block: Option<u64>,
        reset: bool,
    ) -> anyhow::Result<()> {
        // Determine effective sync range
        let (last_synced_block, effective_from_block) = if reset {
            (None, from_block)
        } else {
            let last_synced_block = self.cache.get_dex_last_synced_block(&dex.dex.name).await?;
            let effective_from_block = last_synced_block
                .map_or(from_block, |last_synced| max(from_block, last_synced + 1));
            (last_synced_block, effective_from_block)
        };

        let to_block = match to_block {
            Some(block) => block,
            None => self.hypersync_client.current_block().await,
        };

        // Skip sync if already up to date
        if effective_from_block > to_block {
            log::info!(
                "DEX {} already synced to block {} (current: {}), skipping sync",
                dex.dex.name,
                last_synced_block.unwrap_or(0).separate_with_commas(),
                to_block.separate_with_commas()
            );
            return Ok(());
        }

        let total_blocks = to_block.saturating_sub(effective_from_block) + 1;
        log::info!(
            "Syncing DEX exchange pools from {} to {} (total: {} blocks){}",
            effective_from_block.separate_with_commas(),
            to_block.separate_with_commas(),
            total_blocks.separate_with_commas(),
            if let Some(last_synced) = last_synced_block {
                format!(
                    " - resuming from last synced block {}",
                    last_synced.separate_with_commas()
                )
            } else {
                String::new()
            },
        );
        log::info!(
            "Syncing {} pool creation events from factory contract {} on chain {}",
            dex.dex.name,
            dex.factory,
            self.chain.name
        );

        // Enable performance settings for sync operations
        if let Err(e) = self.cache.toggle_performance_settings(true).await {
            log::warn!("Failed to enable performance settings: {e}");
        }

        let mut metrics = BlockchainSyncReporter::new(
            BlockchainSyncReportItems::PoolCreatedEvents,
            effective_from_block,
            total_blocks,
            BLOCKS_PROCESS_IN_SYNC_REPORT,
        );

        let factory_address = &dex.factory;
        let pair_created_event_signature = dex.pool_created_event.as_ref();
        let pools_stream = self
            .hypersync_client
            .request_contract_events_stream(
                effective_from_block,
                Some(to_block),
                factory_address,
                vec![pair_created_event_signature],
            )
            .await;

        tokio::pin!(pools_stream);

        // LEVEL 1: RPC buffers (small, constrained by rate limits)
        let token_rpc_batch_size = (self.config.multicall_calls_per_rpc_request / 3) as usize;
        let mut token_rpc_buffer: HashSet<Address> = HashSet::new();

        // LEVEL 2: DB buffers (large, optimize for throughput)
        let mut token_db_buffer: Vec<Token> = Vec::new();
        let mut pool_events_buffer: Vec<PoolCreatedEvent> = Vec::new();

        let mut last_block_saved = effective_from_block;

        // Tracking counters
        let mut total_discovered = 0;
        let mut total_skipped_exists = 0;
        let mut total_skipped_invalid_tokens = 0;
        let mut total_saved = 0;

        let cancellation_token = self.cancellation_token.clone();
        let sync_result = tokio::select! {
            () = cancellation_token.cancelled() => {
                log::info!("Exchange pool sync cancelled");
                Err(anyhow::anyhow!("Sync cancelled"))
            }

            result = async {
                while let Some(log) = pools_stream.next().await {
                    let block_number = extract_block_number(&log)?;
                    let blocks_progress = block_number - last_block_saved;
                    last_block_saved = block_number;

                    let pool = dex.parse_pool_created_event_hypersync(log)?;
                    total_discovered += 1;

                    if self.cache.get_pool(&pool.pool_identifier).is_some() {
                        // Pool is already initialized and cached.
                        total_skipped_exists += 1;
                        continue;
                    }

                    if self.cache.is_invalid_token(&pool.token0)
                        || self.cache.is_invalid_token(&pool.token1)
                    {
                        // Skip pools with invalid tokens as they cannot be properly processed or traded.
                        total_skipped_invalid_tokens += 1;
                        continue;
                    }

                    // Collect tokens needed for RPC fetch
                    if self.cache.get_token(&pool.token0).is_none() {
                        token_rpc_buffer.insert(pool.token0);
                    }

                    if self.cache.get_token(&pool.token1).is_none() {
                        token_rpc_buffer.insert(pool.token1);
                    }

                    // Buffer the pool for later processing
                    pool_events_buffer.push(pool);

                    // ==== RPC FLUSHING (small batches) ====
                    if token_rpc_buffer.len() >= token_rpc_batch_size {
                        let fetched_tokens = self
                            .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
                            .await?;

                        // Accumulate for later DB write
                        token_db_buffer.extend(fetched_tokens);
                    }

                    // ==== DB FLUSHING (large batches) ====
                    // Process pools when buffer is full
                    if pool_events_buffer.len() >= POOL_DB_BATCH_SIZE {
                        // 1. Fetch any remaining tokens in RPC buffer (needed for pool construction)
                        if !token_rpc_buffer.is_empty() {
                            let fetched_tokens = self
                                .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
                                .await?;
                            token_db_buffer.extend(fetched_tokens);
                        }

                        // 2. Flush ALL tokens to DB (satisfy foreign key constraints)
                        if !token_db_buffer.is_empty() {
                            self.cache
                                .add_tokens_batch(std::mem::take(&mut token_db_buffer))
                                .await?;
                        }

                        // 3. Now safe to construct and flush pools
                        let pools = self
                            .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
                            .await?;
                        total_saved += pools.len();
                        self.cache.add_pools_batch(pools).await?;
                    }

                    metrics.update(blocks_progress as usize);
                    // Log progress if needed
                    if metrics.should_log_progress(block_number, to_block) {
                        metrics.log_progress(block_number);
                    }
                }

                // ==== FINAL FLUSH (all remaining data) ====
                // 1. Fetch any remaining tokens
                if !token_rpc_buffer.is_empty() {
                    let fetched_tokens = self
                        .fetch_and_cache_tokens_in_memory(&mut token_rpc_buffer)
                        .await?;
                    token_db_buffer.extend(fetched_tokens);
                }

                // 2. Flush all tokens to DB
                if !token_db_buffer.is_empty() {
                    self.cache
                        .add_tokens_batch(std::mem::take(&mut token_db_buffer))
                        .await?;
                }

                // 3. Process and flush all pools
                if !pool_events_buffer.is_empty() {
                    let pools = self
                        .construct_pools_batch(&mut pool_events_buffer, &dex.dex)
                        .await?;
                    total_saved += pools.len();
                    self.cache.add_pools_batch(pools).await?;
                }

                metrics.log_final_stats();

                // Update the last synced block after successful completion.
                self.cache
                    .update_dex_last_synced_block(&dex.dex.name, to_block)
                    .await?;

                log::info!(
                    "Successfully synced DEX {} pools up to block {} | Summary: discovered={}, saved={}, skipped_exists={}, skipped_invalid_tokens={}",
                    dex.dex.name,
                    to_block.separate_with_commas(),
                    total_discovered,
                    total_saved,
                    total_skipped_exists,
                    total_skipped_invalid_tokens
                );

                Ok(())
            } => result
        };

        sync_result?;

        // Restore default safe settings after sync completion
        if let Err(e) = self.cache.toggle_performance_settings(false).await {
            log::warn!("Failed to restore default settings: {e}");
        }

        Ok(())
    }

    /// Fetches token metadata via RPC and updates in-memory cache immediately.
    ///
    /// This method fetches token information using multicall, updates the in-memory cache right away
    /// (so pool construction can proceed), and returns valid tokens for later batch DB writes.
    ///
    /// # Errors
    ///
    /// Returns an error if the RPC multicall fails or database operations fail.
    async fn fetch_and_cache_tokens_in_memory(
        &mut self,
        token_buffer: &mut HashSet<Address>,
    ) -> anyhow::Result<Vec<Token>> {
        let batch_addresses: Vec<Address> = token_buffer.drain().collect();
        let token_infos = self
            .erc20_contract
            .batch_fetch_token_info(&batch_addresses)
            .await?;

        let mut valid_tokens = Vec::new();

        for (token_address, token_info) in token_infos {
            match token_info {
                Ok(token_info) => {
                    // Sanitize token metadata to remove null bytes and invalid UTF-8 characters
                    let sanitized_name = sanitize_string(&token_info.name);
                    let sanitized_symbol = sanitize_string(&token_info.symbol);

                    let token = Token::new(
                        self.chain.clone(),
                        token_address,
                        sanitized_name,
                        sanitized_symbol,
                        token_info.decimals,
                    );

                    // Update in-memory cache IMMEDIATELY (so construct_pool can read it)
                    self.cache.insert_token_in_memory(token.clone());

                    // Collect for LATER DB write
                    valid_tokens.push(token);
                }
                Err(token_info_error) => {
                    self.cache.insert_invalid_token_in_memory(token_address);
                    if let Some(database) = &self.cache.database {
                        let sanitized_error = sanitize_string(&token_info_error.to_string());
                        database
                            .add_invalid_token(
                                self.chain.chain_id,
                                &token_address,
                                &sanitized_error,
                            )
                            .await?;
                    }
                }
            }
        }

        Ok(valid_tokens)
    }

    /// Constructs multiple pools from pool creation events.
    ///
    /// Assumes all required tokens are already in the in-memory cache.
    ///
    /// # Errors
    ///
    /// Logs errors for pools that cannot be constructed (missing tokens),
    /// but does not fail the entire batch.
    async fn construct_pools_batch(
        &self,
        pool_events: &mut Vec<PoolCreatedEvent>,
        dex: &SharedDex,
    ) -> anyhow::Result<Vec<Pool>> {
        let mut pools = Vec::with_capacity(pool_events.len());

        for pool_event in pool_events.drain(..) {
            // Both tokens should be in cache now
            let token0 = match self.cache.get_token(&pool_event.token0) {
                Some(token) => token.clone(),
                None => {
                    if !self.cache.is_invalid_token(&pool_event.token0) {
                        log::warn!(
                            "Skipping pool {}: Token0 {} not in cache and not marked as invalid",
                            pool_event.pool_address,
                            pool_event.token0
                        );
                    }
                    continue;
                }
            };

            let token1 = match self.cache.get_token(&pool_event.token1) {
                Some(token) => token.clone(),
                None => {
                    if !self.cache.is_invalid_token(&pool_event.token1) {
                        log::warn!(
                            "Skipping pool {}: Token1 {} not in cache and not marked as invalid",
                            pool_event.pool_address,
                            pool_event.token1
                        );
                    }
                    continue;
                }
            };

            let mut pool = Pool::new(
                self.chain.clone(),
                dex.clone(),
                pool_event.pool_address,
                pool_event.pool_identifier,
                pool_event.block_number,
                token0,
                token1,
                pool_event.fee,
                pool_event.tick_spacing,
                nautilus_core::UnixNanos::default(),
            );

            // Set hooks if available (UniswapV4)
            if let Some(hooks) = pool_event.hooks {
                pool.set_hooks(hooks);
            }

            // Initialize pool with sqrt_price_x96 and tick if available (UniswapV4)
            if let (Some(sqrt_price_x96), Some(tick)) = (pool_event.sqrt_price_x96, pool_event.tick)
            {
                pool.initialize(sqrt_price_x96, tick);
            }

            pools.push(pool);
        }

        Ok(pools)
    }
}