1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
// Copyright (c) Aptos
// SPDX-License-Identifier: Apache-2.0

#![forbid(unsafe_code)]

use aptos_types::{
    ledger_info::LedgerInfoWithSignatures,
    state_store::state_value::StateValueChunkWithProof,
    transaction::{TransactionListWithProof, TransactionOutputListWithProof, Version},
};
use async_trait::async_trait;
use itertools::Itertools;
use serde::{Deserialize, Serialize};
use std::fmt;
use storage_service::UnexpectedResponseError;
use storage_service_types::{self as storage_service, CompleteDataRange, Epoch};
use thiserror::Error;

pub type ResponseId = u64;

pub mod aptosnet;

pub type Result<T, E = Error> = ::std::result::Result<T, E>;

// TODO(philiphayes): a Error { kind: ErrorKind, inner: BoxError } would be more convenient
/// An error returned by the Aptos Data Client for failed API calls.
#[derive(Clone, Debug, Deserialize, Error, PartialEq, Serialize)]
pub enum Error {
    #[error("The requested data is unavailable and cannot be found! Error: {0}")]
    DataIsUnavailable(String),
    #[error("The requested data is too large: {0}")]
    DataIsTooLarge(String),
    #[error("Invalid request: {0}")]
    InvalidRequest(String),
    #[error("Invalid response: {0}")]
    InvalidResponse(String),
    #[error("Timed out waiting for a response: {0}")]
    TimeoutWaitingForResponse(String),
    #[error("Unexpected error encountered: {0}")]
    UnexpectedErrorEncountered(String),
}

impl Error {
    /// Returns a summary label for the error
    pub fn get_label(&self) -> &'static str {
        match self {
            Self::DataIsUnavailable(_) => "data_is_unavailable",
            Self::DataIsTooLarge(_) => "data_is_too_large",
            Self::InvalidRequest(_) => "invalid_request",
            Self::InvalidResponse(_) => "invalid_response",
            Self::TimeoutWaitingForResponse(_) => "timeout_waiting_for_response",
            Self::UnexpectedErrorEncountered(_) => "unexpected_error_encountered",
        }
    }
}

// TODO(philiphayes): better error wrapping
impl From<UnexpectedResponseError> for Error {
    fn from(err: UnexpectedResponseError) -> Self {
        Self::InvalidResponse(err.0)
    }
}

/// The API offered by the Aptos Data Client.
#[async_trait]
pub trait AptosDataClient {
    /// Returns a global summary of the data currently available in the network.
    ///
    /// This API is intended to be relatively cheap to call, usually returning a
    /// cached view of this data client's available data.
    fn get_global_data_summary(&self) -> GlobalDataSummary;

    /// Returns a single account states chunk with proof, containing the accounts
    /// from start to end index (inclusive) at the specified version. The proof
    /// version is the same as the specified version.
    async fn get_account_states_with_proof(
        &self,
        version: u64,
        start_account_index: u64,
        end_account_index: u64,
    ) -> Result<Response<StateValueChunkWithProof>>;

    /// Returns all epoch ending ledger infos between start and end (inclusive).
    /// If the data cannot be fetched (e.g., the number of epochs is too large),
    /// an error is returned.
    async fn get_epoch_ending_ledger_infos(
        &self,
        start_epoch: Epoch,
        expected_end_epoch: Epoch,
    ) -> Result<Response<Vec<LedgerInfoWithSignatures>>>;

    /// Returns a new transaction output list with proof. Versions start at
    /// `known_version + 1` and `known_epoch` (inclusive). The end version
    /// and proof version are specified by the server. If the data cannot be
    /// fetched, an error is returned.
    async fn get_new_transaction_outputs_with_proof(
        &self,
        known_version: Version,
        known_epoch: Epoch,
    ) -> Result<Response<(TransactionOutputListWithProof, LedgerInfoWithSignatures)>>;

    /// Returns a new transaction list with proof. Versions start at
    /// `known_version + 1` and `known_epoch` (inclusive). The end version
    /// and proof version are specified by the server. If the data cannot be
    /// fetched, an error is returned.
    async fn get_new_transactions_with_proof(
        &self,
        known_version: Version,
        known_epoch: Epoch,
        include_events: bool,
    ) -> Result<Response<(TransactionListWithProof, LedgerInfoWithSignatures)>>;

    /// Returns the number of account states at the specified version.
    async fn get_number_of_account_states(&self, version: Version) -> Result<Response<u64>>;

    /// Returns a transaction output list with proof object, with transaction
    /// outputs from start to end versions (inclusive). The proof is relative to
    /// the specified `proof_version`. If the data cannot be fetched (e.g., the
    /// number of transaction outputs is too large), an error is returned.
    async fn get_transaction_outputs_with_proof(
        &self,
        proof_version: Version,
        start_version: Version,
        end_version: Version,
    ) -> Result<Response<TransactionOutputListWithProof>>;

    /// Returns a transaction list with proof object, with transactions from
    /// start to end versions (inclusive). The proof is relative to the specified
    /// `proof_version`. If `include_events` is true, events are included in the
    /// proof. If the data cannot be fetched (e.g., the number of transactions is
    /// too large), an error is returned.
    async fn get_transactions_with_proof(
        &self,
        proof_version: Version,
        start_version: Version,
        end_version: Version,
        include_events: bool,
    ) -> Result<Response<TransactionListWithProof>>;
}

/// A response error that users of the Aptos Data Client can use to notify
/// the Data Client about invalid or malformed responses.
#[derive(Clone, Debug, Deserialize, PartialEq, Serialize)]
pub enum ResponseError {
    InvalidData,
    InvalidPayloadDataType,
    ProofVerificationError,
}

/// A callback that lets the consumer provide error feedback about a response.
/// Typically, this will contain a reference to the underlying data client and
/// any additional request context needed to update internal scoring.
///
/// This feedback mechanism is required because a Data Client is not always able
/// to fully verify that a given data response is valid (e.g., it is unable
/// to verify all proofs).
///
/// This trait provides a simple feedback mechanism for users of the Data Client
/// to alert it to bad responses so that the peers responsible for providing this
/// data can be penalized.
pub trait ResponseCallback: fmt::Debug + Send + 'static {
    // TODO(philiphayes): ideally this would take a `self: Box<Self>`, i.e.,
    // consume the callback, which better communicates that you should only report
    // an error once. however, the current state-sync-v2 code makes this difficult...
    fn notify_bad_response(&self, error: ResponseError);
}

#[derive(Debug)]
pub struct ResponseContext {
    /// A unique identifier for this request/response pair. Intended mostly for
    /// debugging.
    pub id: ResponseId,
    /// A callback for notifying the data-client source about an error with this
    /// response.
    pub response_callback: Box<dyn ResponseCallback>,
}

/// A response from the Data Client for a single API call.
#[derive(Debug)]
pub struct Response<T> {
    /// Additional context.
    pub context: ResponseContext,
    /// The actual response payload.
    pub payload: T,
}

impl<T> Response<T> {
    pub fn new(context: ResponseContext, payload: T) -> Self {
        Self { context, payload }
    }

    pub fn into_payload(self) -> T {
        self.payload
    }

    pub fn into_parts(self) -> (ResponseContext, T) {
        (self.context, self.payload)
    }

    pub fn map<U, F>(self, f: F) -> Response<U>
    where
        F: FnOnce(T) -> U,
    {
        let (context, payload) = self.into_parts();
        Response::new(context, f(payload))
    }
}

/// The different data client response payloads as an enum.
#[derive(Clone, Debug, Eq, PartialEq)]
pub enum ResponsePayload {
    AccountStatesWithProof(StateValueChunkWithProof),
    EpochEndingLedgerInfos(Vec<LedgerInfoWithSignatures>),
    NewTransactionOutputsWithProof((TransactionOutputListWithProof, LedgerInfoWithSignatures)),
    NewTransactionsWithProof((TransactionListWithProof, LedgerInfoWithSignatures)),
    NumberOfAccountStates(u64),
    TransactionOutputsWithProof(TransactionOutputListWithProof),
    TransactionsWithProof(TransactionListWithProof),
}

impl ResponsePayload {
    pub fn get_label(&self) -> &'static str {
        match self {
            Self::AccountStatesWithProof(_) => "account_states_with_proof",
            Self::EpochEndingLedgerInfos(_) => "epoch_ending_ledger_infos",
            Self::NewTransactionOutputsWithProof(_) => "new_transaction_outputs_with_proof",
            Self::NewTransactionsWithProof(_) => "new_transactions_with_proof",
            Self::NumberOfAccountStates(_) => "number_of_account_states",
            Self::TransactionOutputsWithProof(_) => "transaction_outputs_with_proof",
            Self::TransactionsWithProof(_) => "transactions_with_proof",
        }
    }
}

// Conversions from the inner enum variants to the outer enum

impl From<StateValueChunkWithProof> for ResponsePayload {
    fn from(inner: StateValueChunkWithProof) -> Self {
        Self::AccountStatesWithProof(inner)
    }
}

impl From<Vec<LedgerInfoWithSignatures>> for ResponsePayload {
    fn from(inner: Vec<LedgerInfoWithSignatures>) -> Self {
        Self::EpochEndingLedgerInfos(inner)
    }
}

impl From<(TransactionOutputListWithProof, LedgerInfoWithSignatures)> for ResponsePayload {
    fn from(inner: (TransactionOutputListWithProof, LedgerInfoWithSignatures)) -> Self {
        Self::NewTransactionOutputsWithProof(inner)
    }
}

impl From<(TransactionListWithProof, LedgerInfoWithSignatures)> for ResponsePayload {
    fn from(inner: (TransactionListWithProof, LedgerInfoWithSignatures)) -> Self {
        Self::NewTransactionsWithProof(inner)
    }
}

impl From<u64> for ResponsePayload {
    fn from(inner: u64) -> Self {
        Self::NumberOfAccountStates(inner)
    }
}
impl From<TransactionOutputListWithProof> for ResponsePayload {
    fn from(inner: TransactionOutputListWithProof) -> Self {
        Self::TransactionOutputsWithProof(inner)
    }
}

impl From<TransactionListWithProof> for ResponsePayload {
    fn from(inner: TransactionListWithProof) -> Self {
        Self::TransactionsWithProof(inner)
    }
}

/// A snapshot of the global state of data available in the Aptos network.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct GlobalDataSummary {
    pub advertised_data: AdvertisedData,
    pub optimal_chunk_sizes: OptimalChunkSizes,
}

impl GlobalDataSummary {
    /// Returns an empty global data summary. This can be used on startup
    /// before the global state is known, or for testing.
    pub fn empty() -> Self {
        GlobalDataSummary {
            advertised_data: AdvertisedData::empty(),
            optimal_chunk_sizes: OptimalChunkSizes::empty(),
        }
    }

    /// Returns true iff the global data summary is empty
    pub fn is_empty(&self) -> bool {
        self == &Self::empty()
    }
}

/// Holds the optimal chunk sizes that clients should use when
/// requesting data. This makes the request *more likely* to succeed.
#[derive(Clone, Debug, Eq, PartialEq)]
pub struct OptimalChunkSizes {
    pub account_states_chunk_size: u64,
    pub epoch_chunk_size: u64,
    pub transaction_chunk_size: u64,
    pub transaction_output_chunk_size: u64,
}

impl OptimalChunkSizes {
    pub fn empty() -> Self {
        OptimalChunkSizes {
            account_states_chunk_size: 0,
            epoch_chunk_size: 0,
            transaction_chunk_size: 0,
            transaction_output_chunk_size: 0,
        }
    }
}

/// A summary of all data that is currently advertised in the network.
#[derive(Clone, Eq, PartialEq)]
pub struct AdvertisedData {
    /// The ranges of account states advertised, e.g., if a range is
    /// (X,Y), it means all account states are held for every version X->Y
    /// (inclusive).
    pub account_states: Vec<CompleteDataRange<Version>>,

    /// The ranges of epoch ending ledger infos advertised, e.g., if a range
    /// is (X,Y), it means all epoch ending ledger infos for epochs X->Y
    /// (inclusive) are available.
    pub epoch_ending_ledger_infos: Vec<CompleteDataRange<Epoch>>,

    /// The ledger infos corresponding to the highest synced versions
    /// currently advertised.
    pub synced_ledger_infos: Vec<LedgerInfoWithSignatures>,

    /// The ranges of transactions advertised, e.g., if a range is
    /// (X,Y), it means all transactions for versions X->Y (inclusive)
    /// are available.
    pub transactions: Vec<CompleteDataRange<Version>>,

    /// The ranges of transaction outputs advertised, e.g., if a range
    /// is (X,Y), it means all transaction outputs for versions X->Y
    /// (inclusive) are available.
    pub transaction_outputs: Vec<CompleteDataRange<Version>>,
}

impl fmt::Debug for AdvertisedData {
    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
        let sync_lis = (&self.synced_ledger_infos)
            .iter()
            .map(|LedgerInfoWithSignatures::V0(ledger)| format!("{}", ledger))
            .join(", ");
        write!(
            f,
            "account_states: {:?}, epoch_ending_ledger_infos: {:?}, synced_ledger_infos: [{}], transactions: {:?}, transaction_outputs: {:?}",
            &self.account_states, &self.epoch_ending_ledger_infos, sync_lis, &self.transactions, &self.transaction_outputs
        )
    }
}

impl AdvertisedData {
    pub fn empty() -> Self {
        AdvertisedData {
            account_states: vec![],
            epoch_ending_ledger_infos: vec![],
            synced_ledger_infos: vec![],
            transactions: vec![],
            transaction_outputs: vec![],
        }
    }

    /// Returns true iff all data items (`lowest` to `highest`, inclusive) can
    /// be found in the given `advertised_ranges`.
    pub fn contains_range(
        lowest: u64,
        highest: u64,
        advertised_ranges: &[CompleteDataRange<u64>],
    ) -> bool {
        for item in lowest..=highest {
            let mut item_exists = false;

            for advertised_range in advertised_ranges {
                if advertised_range.contains(item) {
                    item_exists = true;
                    break;
                }
            }

            if !item_exists {
                return false;
            }
        }
        true
    }

    /// Returns the highest epoch ending ledger info advertised in the network
    pub fn highest_epoch_ending_ledger_info(&self) -> Option<Epoch> {
        self.epoch_ending_ledger_infos
            .iter()
            .map(|epoch_range| epoch_range.highest())
            .max()
    }

    /// Returns the highest synced ledger info advertised in the network
    pub fn highest_synced_ledger_info(&self) -> Option<LedgerInfoWithSignatures> {
        let highest_synced_position = self
            .synced_ledger_infos
            .iter()
            .map(|ledger_info_with_sigs| ledger_info_with_sigs.ledger_info().version())
            .position_max();

        if let Some(highest_synced_position) = highest_synced_position {
            self.synced_ledger_infos
                .get(highest_synced_position)
                .cloned()
        } else {
            None
        }
    }

    /// Returns the lowest advertised version containing all account states
    pub fn lowest_account_states_version(&self) -> Option<Version> {
        get_lowest_version_from_range_set(&self.account_states)
    }

    /// Returns the lowest advertised transaction output version
    pub fn lowest_transaction_output_version(&self) -> Option<Version> {
        get_lowest_version_from_range_set(&self.transaction_outputs)
    }

    /// Returns the lowest advertised transaction version
    pub fn lowest_transaction_version(&self) -> Option<Version> {
        get_lowest_version_from_range_set(&self.transactions)
    }
}

/// Returns the lowest version from the given set of data ranges
fn get_lowest_version_from_range_set(
    data_ranges: &[CompleteDataRange<Version>],
) -> Option<Version> {
    data_ranges
        .iter()
        .map(|data_range| data_range.lowest())
        .min()
}