1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
//! # NEAR Lake Framework
//!
//! NEAR Lake Framework is a small library companion to [NEAR Lake](https://github.com/near/near-lake). It allows you to build
//! your own indexer that subscribes to the stream of blocks from the NEAR Lake data source and create your own logic to process
//! the NEAR Protocol data.

//! ## Example

//! ```rust
//! use futures::StreamExt;
//! use near_lake_framework::LakeConfigBuilder;
//!
//! #[tokio::main]
//! async fn main() -> Result<(), tokio::io::Error> {
//!    // create a NEAR Lake Framework config
//!    let config = LakeConfigBuilder::default()
//!        .testnet()
//!        .start_block_height(82422587)
//!        .build()
//!        .expect("Failed to build LakeConfig");
//!
//!    // instantiate the NEAR Lake Framework Stream
//!    let (sender, stream) = near_lake_framework::streamer(config);
//!
//!    // read the stream events and pass them to a handler function with
//!    // concurrency 1
//!    let mut handlers = tokio_stream::wrappers::ReceiverStream::new(stream)
//!        .map(|streamer_message| handle_streamer_message(streamer_message))
//!        .buffer_unordered(1usize);
//!
//!    while let Some(_handle_message) = handlers.next().await {}
//!    drop(handlers); // close the channel so the sender will stop
//!
//!    // propagate errors from the sender
//!    match sender.await {
//!        Ok(Ok(())) => Ok(()),
//!        Ok(Err(e)) => Err(e),
//!        Err(e) => Err(anyhow::Error::from(e)), // JoinError
//!    }
//!}
//!
//! // The handler function to take the entire `StreamerMessage`
//! // and print the block height and number of shards
//! async fn handle_streamer_message(
//!    streamer_message: near_lake_framework::near_indexer_primitives::StreamerMessage,
//! ) {
//!    eprintln!(
//!        "{} / shards {}",
//!        streamer_message.block.header.height,
//!        streamer_message.shards.len()
//!    );
//!}
//!```
//!
//! ## Tutorials:
//!
//! - <https://youtu.be/GsF7I93K-EQ>
//! - [Migrating to NEAR Lake Framework](https://near-indexers.io/tutorials/lake/migrating-to-near-lake-framework) from [NEAR Indexer Framework](https://near-indexers.io/docs/projects/near-indexer-framework)
//!
//! ### More examples
//!
//! - <https://github.com/near-examples/near-lake-raw-printer> simple example of a data printer built on top of NEAR Lake Framework
//! - <https://github.com/near-examples/near-lake-accounts-watcher> another simple example of the indexer built on top of NEAR Lake Framework for a tutorial purpose
//!
//! - <https://github.com/near-examples/indexer-tx-watcher-example-lake> an example of the indexer built on top of NEAR Lake Framework that watches for transactions related to specified account(s)
//! - <https://github.com/octopus-network/octopus-near-indexer-s3> a community-made project that uses NEAR Lake Framework
//!
//! ## How to use
//!
//! ### AWS S3 Credentials
//!
//! In order to be able to get objects from the AWS S3 bucket you need to provide the AWS credentials.
//! #### Passing credentials to the config builder
//!
//! ```rust
//! use near_lake_framework::LakeConfigBuilder;
//!
//! # async fn main() {
//! let credentials = aws_credential_types::Credentials::new(
//!     "AKIAIOSFODNN7EXAMPLE",
//!     "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY",
//!     None,
//!     None,
//!     "custom_credentials",
//! );
//! let s3_config = aws_sdk_s3::Config::builder()
//!     .credentials_provider(credentials)
//!     .build();
//!
//! let config = LakeConfigBuilder::default()
//!      .s3_config(s3_config)
//!      .s3_bucket_name("near-lake-data-custom")
//!      .start_block_height(1)
//!      .build()
//!      .expect("Failed to build LakeConfig");
//! # }
//! ```
//!
//! **You should never hardcode your credentials, it is insecure. Use the described method to pass the credentials you read from CLI arguments**
//!
//! #### File-based AWS credentials
//!AWS default profile configuration with aws configure looks similar to the following:
//!
//!`~/.aws/credentials`
//!```
//![default]
//!aws_access_key_id=AKIAIOSFODNN7EXAMPLE
//!aws_secret_access_key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
//!```
//!
//![AWS docs: Configuration and credential file settings](https://docs.aws.amazon.com/cli/latest/userguide/cli-configure-files.html)
//!
//! ### Environmental variables
//!
//! Alternatively, you can provide your AWS credentials via environment variables with constant names:
//!
//!```
//!$ export AWS_ACCESS_KEY_ID=AKIAIOSFODNN7EXAMPLE
//!$ AWS_SECRET_ACCESS_KEY=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
//!$ AWS_DEFAULT_REGION=eu-central-1
//!```
//!
//!### Dependencies
//!
//!Add the following dependencies to your `Cargo.toml`
//!
//!```toml
//!...
//![dependencies]
//!futures = "0.3.5"
//!itertools = "0.10.3"
//!tokio = { version = "1.1", features = ["sync", "time", "macros", "rt-multi-thread"] }
//!tokio-stream = { version = "0.1" }
//!
//!# NEAR Lake Framework
//!near-lake-framework = "0.6.1"
//!```
//!
//! ### Custom S3 storage
//!
//! In case you want to run your own [near-lake](https://github.com/near/near-lake) instance and store data in some S3 compatible storage ([Minio](https://min.io/) or [Localstack](https://localstack.cloud/) as example)
//! You can owerride default S3 API endpoint by using `s3_endpoint` option
//!
//! - run minio
//!
//! ```bash
//! $ mkdir -p /data/near-lake-custom && minio server /data
//! ```
//!
//! - pass custom `aws_sdk_s3::config::Config` to the [LakeConfigBuilder]
//!
//! ```rust
//! use aws_sdk_s3::Endpoint;
//! use http::Uri;
//! use near_lake_framework::LakeConfigBuilder;
//!
//! # async fn main() {
//! let aws_config = aws_config::from_env().load().await;
//! let mut s3_conf = aws_sdk_s3::config::Builder::from(&aws_config);
//! s3_conf = s3_conf
//!     .endpoint_resolver(
//!             Endpoint::immutable("http://0.0.0.0:9000".parse::<Uri>().unwrap()))
//!     .build();
//!
//! let config = LakeConfigBuilder::default()
//!     .s3_config(s3_conf)
//!     .s3_bucket_name("near-lake-data-custom")
//!     .start_block_height(1)
//!     .build()
//!     .expect("Failed to build LakeConfig");
//! # }
//! ```
//!
//! ## Configuration
//!
//! Everything should be configured before the start of your indexer application via `LakeConfigBuilder` struct.
//!
//! Available parameters:
//!
//! * [`start_block_height(value: u64)`](LakeConfigBuilder::start_block_height) - block height to start the stream from
//! * *optional* [`s3_bucket_name(value: impl Into<String>)`](LakeConfigBuilder::s3_bucket_name) - provide the AWS S3 bucket name (you need to provide it if you use custom S3-compatible service, otherwise you can use [LakeConfigBuilder::mainnet] and [LakeConfigBuilder::testnet])
//! * *optional* [`LakeConfigBuilder::s3_region_name(value: impl Into<String>)`](LakeConfigBuilder::s3_region_name) - provide the AWS S3 region name (if you need to set a custom one)
//! * *optional* [`LakeConfigBuilder::s3_config(value: aws_sdk_s3::config::Config`](LakeConfigBuilder::s3_config) - provide custom AWS SDK S3 Config
//!
//! ## Cost estimates (Updated Mar 10, 2022 with more precise calculations)
//!
//! **TL;DR** approximately $20 per month (for AWS S3 access, paid directly to AWS) for the reading of fresh blocks
//!
//! ### Historical indexing
//!
//! | Blocks | GET | LIST | Subtotal GET | Subtotal LIST | Total $ |
//! |---|---|---|---|---|---|
//! | 1000 | 5000 | 4 | 0.00215 | 0.0000216 | $0.00 |
//! | 86,400 | 432000 | 345.6 | 0.18576 | 0.00186624 | $0.19 |
//! | 2,592,000 | 12960000 | 10368 | 5.5728 | 0.0559872 | $5.63 |
//! | 77,021,059 | 385105295 | 308084.236 | 165.5952769 | 1.663654874 | $167.26 |
//!
//! **Note:** ~77m of blocks is the number of blocks on the moment I was calculating.
//!
// !**84,400 blocks is approximate number of blocks per day** (1 block per second * 60 seconds * 60 minutes * 24 hours)
//!
//! **2,592,000 blocks is approximate number of blocks per months** (86,400 blocks per day * 30 days)
//!
//! ### Tip of the network indexing
//!
//! | Blocks | GET | LIST | Subtotal GET | Subtotal LIST | Total $ |
//! |---|---|---|---|---|---|
//! | 1000 | 5000 | 1000 | 0.00215 | 0.0054 | $0.01 |
//! | 86,400 | 432000 | 86,400 | 0.18576 | 0.46656 | $0.65 |
//! | 2,592,000 | 12960000 | 2,592,000 | 5.5728 | 13.9968 | $19.57 |
//! | 77,021,059 | 385105295 | 77,021,059 | 165.5952769 | 415.9137186 | $581.51 |
//!
//! Explanation:
//!
//! Assuming NEAR Protocol produces accurately 1 block per second (which is really not, the average block production time is 1.3s). A full day consists of 86400 seconds, that's the max number of blocks that can be produced.
//!
//! According the [Amazon S3 prices](https://aws.amazon.com/s3/pricing/?nc1=h_ls) `list` requests are charged for $0.0054 per 1000 requests and `get` is charged for $0.00043 per 1000 requests.
//!
//! Calculations (assuming we are following the tip of the network all the time):
//!
//! ```text
//! 86400 blocks per day * 5 requests for each block / 1000 requests * $0.0004 per 1k requests = $0.19 * 30 days = $5.7
//! ```
//! **Note:** 5 requests for each block means we have 4 shards (1 file for common block data and 4 separate files for each shard)
//!
//! And a number of `list` requests we need to perform for 30 days:
//!
//! ```text
//! 86400 blocks per day / 1000 requests * $0.005 per 1k list requests = $0.47 * 30 days = $14.1
//!
//! $5.7 + $14.1 = $19.8
//! ```
//!
//! The price depends on the number of shards
//!
//! ## Future plans
//!
//! We use Milestones with clearly defined acceptance criteria:
//!
//! * [x] [MVP](https://github.com/near/near-lake-framework/milestone/1)
//! * [ ] [0.8 High-level update](https://github.com/near/near-lake-framework-rs/milestone/3)
//! * [ ] [1.0](https://github.com/near/near-lake-framework/milestone/2)

#[macro_use]
extern crate derive_builder;

use futures::stream::StreamExt;
use tokio::sync::mpsc;
use tokio::sync::mpsc::error::SendError;

pub use near_indexer_primitives;

pub use aws_credential_types::Credentials;
pub use types::{LakeConfig, LakeConfigBuilder};

pub mod s3_client;
pub mod s3_fetchers;
pub(crate) mod types;

pub(crate) const LAKE_FRAMEWORK: &str = "near_lake_framework";

/// Creates `mpsc::channel` and returns the `receiver` to read the stream of `StreamerMessage`
/// ```
/// use near_lake_framework::LakeConfigBuilder;
/// use tokio::sync::mpsc;
///
/// # async fn main() {
///    let config = LakeConfigBuilder::default()
///        .testnet()
///        .start_block_height(82422587)
///        .build()
///        .expect("Failed to build LakeConfig");
///
///     let (_, stream) = near_lake_framework::streamer(config);
///
///     while let Some(streamer_message) = stream.recv().await {
///         eprintln!("{:#?}", streamer_message);
///     }
/// # }
/// ```
pub fn streamer(
    config: LakeConfig,
) -> (
    tokio::task::JoinHandle<Result<(), anyhow::Error>>,
    mpsc::Receiver<near_indexer_primitives::StreamerMessage>,
) {
    let (sender, receiver) = mpsc::channel(config.blocks_preload_pool_size);
    (tokio::spawn(start(sender, config)), receiver)
}

fn stream_block_heights<'a: 'b, 'b>(
    lake_s3_client: &'a dyn s3_client::S3Client,
    s3_bucket_name: &'a str,
    mut start_from_block_height: crate::types::BlockHeight,
) -> impl futures::Stream<Item = u64> + 'b {
    async_stream::stream! {
        loop {
            tracing::debug!(target: LAKE_FRAMEWORK, "Fetching a list of blocks from S3...");
            match s3_fetchers::list_block_heights(
                lake_s3_client,
                s3_bucket_name,
                start_from_block_height,
            )
            .await {
                Ok(block_heights) => {
                    if block_heights.is_empty() {
                        tracing::debug!(
                            target: LAKE_FRAMEWORK,
                            "There are no newer block heights than {} in bucket {}. Fetching again in 2s...",
                            start_from_block_height,
                            s3_bucket_name,
                        );
                        tokio::time::sleep(std::time::Duration::from_secs(2)).await;
                        continue;
                    }
                    tracing::debug!(
                        target: LAKE_FRAMEWORK,
                        "Received {} newer block heights",
                        block_heights.len()
                    );

                    start_from_block_height = *block_heights.last().unwrap() + 1;
                    for block_height in block_heights {
                        tracing::debug!(target: LAKE_FRAMEWORK, "Yielding {} block height...", block_height);
                        yield block_height;
                    }
                }
                Err(err) => {
                    tracing::warn!(
                        target: LAKE_FRAMEWORK,
                        "Failed to get block heights from bucket {}: {}. Retrying in 1s...",
                        s3_bucket_name,
                        err,
                    );
                    tokio::time::sleep(std::time::Duration::from_secs(1)).await;
                }
            }
        }
    }
}

// The only consumer of the BlockHeights Streamer
async fn prefetch_block_heights_into_pool(
    pending_block_heights: &mut std::pin::Pin<
        &mut impl tokio_stream::Stream<Item = crate::types::BlockHeight>,
    >,
    limit: usize,
    await_for_at_least_one: bool,
) -> anyhow::Result<Vec<crate::types::BlockHeight>> {
    let mut block_heights = Vec::with_capacity(limit);
    for remaining_limit in (0..limit).rev() {
        tracing::debug!(target: LAKE_FRAMEWORK, "Polling for the next block height without awaiting... (up to {} block heights are going to be fetched)", remaining_limit);
        match futures::poll!(pending_block_heights.next()) {
            std::task::Poll::Ready(Some(block_height)) => {
                block_heights.push(block_height);
            }
            std::task::Poll::Pending => {
                if await_for_at_least_one && block_heights.is_empty() {
                    tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, and the prefetching blocks queue is empty, so we need to await for at least a single block height to be available before proceeding...");
                    match pending_block_heights.next().await {
                        Some(block_height) => {
                            block_heights.push(block_height);
                        }
                        None => {
                            return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite."));
                        }
                    }
                    continue;
                }
                tracing::debug!(target: LAKE_FRAMEWORK, "There were no block heights available immediatelly, so we should not block here and keep processing the blocks.");
                break;
            }
            std::task::Poll::Ready(None) => {
                return Err(anyhow::anyhow!("This state should be unreachable as the block heights stream should be infinite."));
            }
        }
    }
    Ok(block_heights)
}

#[allow(unused_labels)] // we use loop labels for code-readability
async fn start(
    streamer_message_sink: mpsc::Sender<near_indexer_primitives::StreamerMessage>,
    config: LakeConfig,
) -> anyhow::Result<()> {
    let mut start_from_block_height = config.start_block_height;

    let lake_s3_client: Box<dyn crate::s3_client::S3Client> =
        if let Some(s3_client) = config.s3_client {
            s3_client
        } else if let Some(config) = config.s3_config {
            Box::new(s3_fetchers::LakeS3Client::from_conf(config))
        } else {
            let aws_config = aws_config::from_env().load().await;
            let s3_config = aws_sdk_s3::config::Builder::from(&aws_config)
                .region(aws_types::region::Region::new(config.s3_region_name))
                .build();

            Box::new(s3_fetchers::LakeS3Client::from_conf(s3_config))
        };

    let mut last_processed_block_hash: Option<near_indexer_primitives::CryptoHash> = None;

    'main: loop {
        // In the beginning of the 'main' loop we create a Block Heights stream
        // and prefetch the initial data in that pool.
        // Later the 'stream' loop might exit to this 'main' one to repeat the procedure.
        // This happens because we assume Lake Indexer that writes to the S3 Bucket might
        // in some cases, write N+1 block before it finishes writing the N block.
        // We require to stream blocks consistently, so we need to try to load the block again.

        let pending_block_heights = stream_block_heights(
            &*lake_s3_client,
            &config.s3_bucket_name,
            start_from_block_height,
        );
        tokio::pin!(pending_block_heights);

        let mut streamer_messages_futures = futures::stream::FuturesOrdered::new();
        tracing::debug!(
            target: LAKE_FRAMEWORK,
            "Prefetching up to {} blocks...",
            config.blocks_preload_pool_size
        );

        streamer_messages_futures.extend(
            prefetch_block_heights_into_pool(
                &mut pending_block_heights,
                config.blocks_preload_pool_size,
                true,
            )
            .await?
            .into_iter()
            .map(|block_height| {
                s3_fetchers::fetch_streamer_message(
                    &*lake_s3_client,
                    &config.s3_bucket_name,
                    block_height,
                )
            }),
        );

        tracing::debug!(
            target: LAKE_FRAMEWORK,
            "Awaiting for the first prefetched block..."
        );
        'stream: while let Some(streamer_message_result) = streamer_messages_futures.next().await {
            let streamer_message = streamer_message_result.map_err(|err| {
                tracing::error!(
                    target: LAKE_FRAMEWORK,
                    "Failed to fetch StreamerMessage with error: \n{:#?}",
                    err,
                );
                err
            })?;

            tracing::debug!(
                target: LAKE_FRAMEWORK,
                "Received block #{} ({})",
                streamer_message.block.header.height,
                streamer_message.block.header.hash
            );
            // check if we have `last_processed_block_hash` (might be None only on start)
            if let Some(prev_block_hash) = last_processed_block_hash {
                // compare last_processed_block_hash` with `block.header.prev_hash` of the current
                // block (ensure we don't miss anything from S3)
                // retrieve the data from S3 if prev_hashes don't match and repeat the main loop step
                if prev_block_hash != streamer_message.block.header.prev_hash {
                    tracing::warn!(
                        target: LAKE_FRAMEWORK,
                        "`prev_hash` does not match, refetching the data from S3 in 200ms",
                    );
                    tokio::time::sleep(std::time::Duration::from_millis(200)).await;
                    break 'stream;
                }
            }

            // store current block info as `last_processed_block_*` for next iteration
            last_processed_block_hash = Some(streamer_message.block.header.hash);
            start_from_block_height = streamer_message.block.header.height + 1;

            tracing::debug!(
                target: LAKE_FRAMEWORK,
                "Prefetching up to {} blocks... (there are {} blocks in the prefetching pool)",
                config.blocks_preload_pool_size,
                streamer_messages_futures.len(),
            );
            tracing::debug!(
                target: LAKE_FRAMEWORK,
                "Streaming block #{} ({})",
                streamer_message.block.header.height,
                streamer_message.block.header.hash
            );
            let blocks_preload_pool_current_len = streamer_messages_futures.len();

            let prefetched_block_heights_future = prefetch_block_heights_into_pool(
                &mut pending_block_heights,
                config
                    .blocks_preload_pool_size
                    .saturating_sub(blocks_preload_pool_current_len),
                blocks_preload_pool_current_len == 0,
            );

            let streamer_message_sink_send_future = streamer_message_sink.send(streamer_message);

            let (prefetch_res, send_res): (
                Result<Vec<types::BlockHeight>, anyhow::Error>,
                Result<_, SendError<near_indexer_primitives::StreamerMessage>>,
            ) = futures::join!(
                prefetched_block_heights_future,
                streamer_message_sink_send_future,
            );

            if let Err(SendError(err)) = send_res {
                tracing::debug!(
                    target: LAKE_FRAMEWORK,
                    "Failed to send StreamerMessage (#{:0>12}) to the channel. Channel is closed, exiting \n{:?}",
                    start_from_block_height - 1,
                    err,
                );
                return Ok(());
            }

            streamer_messages_futures.extend(
                prefetch_res
                    .map_err(|err| {
                        tracing::error!(
                            target: LAKE_FRAMEWORK,
                            "Failed to prefetch block heights to the prefetching pool with error: \n{:#?}",
                            err
                        );
                        err
                    })?
                    .into_iter()
                    .map(|block_height| {
                        s3_fetchers::fetch_streamer_message(
                            &*lake_s3_client,
                            &config.s3_bucket_name,
                            block_height,
                        )
                    }
            ));
        }

        tracing::warn!(
            target: LAKE_FRAMEWORK,
            "Exited from the 'stream' loop. It may happen in two cases:\n
            1. Blocks has ended (impossible, might be an error on the Lake Buckets),\n
            2. Received a Block which prev_hash doesn't match the previously streamed block.\n
            Will attempt to restart the stream from block #{}",
            start_from_block_height,
        );
    }
}