use std::{
collections::{HashMap, HashSet},
fmt::Display,
};
use async_trait::async_trait;
use chrono::NaiveDateTime;
use thiserror::Error;
use crate::{
dto,
models::{
blockchain::{
Block, EntryPoint, EntryPointWithTracingParams, TracedEntryPoint, TracingParams,
TracingResult, Transaction,
},
contract::{Account, AccountBalance, AccountDelta},
protocol::{
ComponentBalance, ProtocolComponent, ProtocolComponentState,
ProtocolComponentStateDelta, QualityRange,
},
token::Token,
Address, BlockHash, Chain, ComponentId, ContractId, EntryPointId, ExtractionState,
PaginationParams, ProtocolSystem, ProtocolType, TxHash,
},
Bytes,
};
#[derive(Debug, Clone, PartialEq, Hash, Eq)]
pub enum BlockIdentifier {
Number((Chain, i64)),
Hash(BlockHash),
Latest(Chain),
}
impl Display for BlockIdentifier {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
write!(f, "{self:?}")
}
}
#[derive(Error, Debug, PartialEq, Clone)]
pub enum StorageError {
#[error("Could not find {0} with id `{1}`!")]
NotFound(String, String),
#[error("The entity {0} with id {1} was already present!")]
DuplicateEntry(String, String),
#[error("Could not find related {0} for {1} with id `{2}`!")]
NoRelatedEntity(String, String, String),
#[error("DecodeError: {0}")]
DecodeError(String),
#[error("Unexpected storage error: {0}")]
Unexpected(String),
#[error("Currently unsupported operation: {0}")]
Unsupported(String),
#[error("Write cache unexpectedly dropped notification channel!")]
WriteCacheGoneAway(),
#[error("Invalid block range encountered")]
InvalidBlockRange(),
}
#[async_trait]
pub trait ChainGateway {
async fn upsert_block(&self, new: &[Block]) -> Result<(), StorageError>;
async fn get_block(&self, id: &BlockIdentifier) -> Result<Block, StorageError>;
async fn upsert_tx(&self, new: &[Transaction]) -> Result<(), StorageError>;
async fn get_tx(&self, hash: &TxHash) -> Result<Transaction, StorageError>;
async fn revert_state(&self, to: &BlockIdentifier) -> Result<(), StorageError>;
}
#[async_trait]
pub trait ExtractionStateGateway {
async fn get_state(&self, name: &str, chain: &Chain) -> Result<ExtractionState, StorageError>;
async fn save_state(&self, state: &ExtractionState) -> Result<(), StorageError>;
}
#[derive(Debug, Clone, PartialEq, Hash, Eq)]
pub enum BlockOrTimestamp {
Block(BlockIdentifier),
Timestamp(NaiveDateTime),
}
#[allow(deprecated)]
impl TryFrom<&dto::VersionParam> for BlockOrTimestamp {
type Error = anyhow::Error;
fn try_from(version: &dto::VersionParam) -> Result<Self, Self::Error> {
match (&version.timestamp, &version.block) {
(_, Some(block)) => {
let block_identifier = match (&block.hash, &block.chain, &block.number) {
(Some(hash), _, _) => BlockIdentifier::Hash(hash.clone()),
(_, Some(chain), Some(number)) => {
BlockIdentifier::Number((Chain::from(*chain), *number))
}
_ => {
return Err(anyhow::format_err!("Insufficient block information".to_owned()))
}
};
Ok(BlockOrTimestamp::Block(block_identifier))
}
(Some(timestamp), None) => Ok(BlockOrTimestamp::Timestamp(*timestamp)),
(None, None) => {
Err(anyhow::format_err!("Missing timestamp or block identifier".to_owned()))
}
}
}
}
#[derive(Debug, Clone, Default)]
pub enum VersionKind {
#[default]
Last,
First,
Index(i64),
}
#[derive(Debug, Clone)]
pub struct Version(pub BlockOrTimestamp, pub VersionKind);
impl Version {
pub fn from_block_number(chain: Chain, number: i64) -> Self {
Self(BlockOrTimestamp::Block(BlockIdentifier::Number((chain, number))), VersionKind::Last)
}
pub fn from_ts(ts: NaiveDateTime) -> Self {
Self(BlockOrTimestamp::Timestamp(ts), VersionKind::Last)
}
}
#[derive(Debug)]
pub struct WithTotal<T> {
pub entity: T,
pub total: Option<i64>,
}
#[async_trait]
pub trait ProtocolGateway {
async fn get_protocol_components(
&self,
chain: &Chain,
system: Option<String>,
ids: Option<&[&str]>,
min_tvl: Option<f64>,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<Vec<ProtocolComponent>>, StorageError>;
async fn get_token_owners(
&self,
chain: &Chain,
tokens: &[Address],
min_balance: Option<f64>,
) -> Result<HashMap<Address, (ComponentId, Bytes)>, StorageError>;
async fn add_protocol_components(&self, new: &[ProtocolComponent]) -> Result<(), StorageError>;
async fn delete_protocol_components(
&self,
to_delete: &[ProtocolComponent],
block_ts: NaiveDateTime,
) -> Result<(), StorageError>;
async fn add_protocol_types(
&self,
new_protocol_types: &[ProtocolType],
) -> Result<(), StorageError>;
async fn get_protocol_states(
&self,
chain: &Chain,
at: Option<Version>,
system: Option<String>,
ids: Option<&[&str]>,
retrieve_balances: bool,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<Vec<ProtocolComponentState>>, StorageError>;
async fn update_protocol_states(
&self,
new: &[(TxHash, ProtocolComponentStateDelta)],
) -> Result<(), StorageError>;
async fn get_tokens(
&self,
chain: Chain,
address: Option<&[&Address]>,
quality: QualityRange,
traded_n_days_ago: Option<NaiveDateTime>,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<Vec<Token>>, StorageError>;
async fn add_component_balances(
&self,
component_balances: &[ComponentBalance],
) -> Result<(), StorageError>;
async fn add_tokens(&self, tokens: &[Token]) -> Result<(), StorageError>;
async fn update_tokens(&self, tokens: &[Token]) -> Result<(), StorageError>;
async fn get_protocol_states_delta(
&self,
chain: &Chain,
start_version: Option<&BlockOrTimestamp>,
end_version: &BlockOrTimestamp,
) -> Result<Vec<ProtocolComponentStateDelta>, StorageError>;
async fn get_balance_deltas(
&self,
chain: &Chain,
start_version: Option<&BlockOrTimestamp>,
target_version: &BlockOrTimestamp,
) -> Result<Vec<ComponentBalance>, StorageError>;
async fn get_component_balances(
&self,
chain: &Chain,
ids: Option<&[&str]>,
version: Option<&Version>,
) -> Result<HashMap<String, HashMap<Bytes, ComponentBalance>>, StorageError>;
async fn get_token_prices(&self, chain: &Chain) -> Result<HashMap<Bytes, f64>, StorageError>;
async fn upsert_component_tvl(
&self,
chain: &Chain,
tvl_values: &HashMap<String, f64>,
) -> Result<(), StorageError>;
async fn get_protocol_systems(
&self,
chain: &Chain,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<Vec<String>>, StorageError>;
async fn get_component_tvls(
&self,
chain: &Chain,
system: Option<String>,
ids: Option<&[&str]>,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<HashMap<String, f64>>, StorageError>;
}
#[derive(Debug, Clone)]
pub struct EntryPointFilter {
pub protocol_system: ProtocolSystem,
pub component_ids: Option<Vec<ComponentId>>,
}
impl EntryPointFilter {
pub fn new(protocol: ProtocolSystem) -> Self {
Self { protocol_system: protocol, component_ids: None }
}
pub fn with_component_ids(mut self, component_ids: Vec<ComponentId>) -> Self {
self.component_ids = Some(component_ids);
self
}
}
#[async_trait]
pub trait EntryPointGateway {
async fn insert_entry_points(
&self,
entry_points: &HashMap<ComponentId, HashSet<EntryPoint>>,
) -> Result<(), StorageError>;
async fn insert_entry_point_tracing_params(
&self,
entry_points_params: &HashMap<EntryPointId, HashSet<(TracingParams, ComponentId)>>,
) -> Result<(), StorageError>;
async fn get_entry_points(
&self,
filter: EntryPointFilter,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<HashMap<ComponentId, HashSet<EntryPoint>>>, StorageError>;
async fn get_entry_points_tracing_params(
&self,
filter: EntryPointFilter,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<HashMap<ComponentId, HashSet<EntryPointWithTracingParams>>>, StorageError>;
async fn upsert_traced_entry_points(
&self,
traced_entry_points: &[TracedEntryPoint],
) -> Result<(), StorageError>;
async fn get_traced_entry_points(
&self,
entry_points: &HashSet<EntryPointId>,
) -> Result<HashMap<EntryPointId, HashMap<TracingParams, TracingResult>>, StorageError>;
}
#[async_trait]
pub trait ContractStateGateway {
async fn get_contract(
&self,
id: &ContractId,
version: Option<&Version>,
include_slots: bool,
) -> Result<Account, StorageError>;
async fn get_contracts(
&self,
chain: &Chain,
addresses: Option<&[Address]>,
version: Option<&Version>,
include_slots: bool,
pagination_params: Option<&PaginationParams>,
) -> Result<WithTotal<Vec<Account>>, StorageError>;
async fn insert_contract(&self, new: &Account) -> Result<(), StorageError>;
async fn update_contracts(&self, new: &[(TxHash, AccountDelta)]) -> Result<(), StorageError>;
async fn delete_contract(&self, id: &ContractId, at_tx: &TxHash) -> Result<(), StorageError>;
async fn get_accounts_delta(
&self,
chain: &Chain,
start_version: Option<&BlockOrTimestamp>,
end_version: &BlockOrTimestamp,
) -> Result<Vec<AccountDelta>, StorageError>;
async fn add_account_balances(
&self,
account_balances: &[AccountBalance],
) -> Result<(), StorageError>;
async fn get_account_balances(
&self,
chain: &Chain,
accounts: Option<&[Address]>,
version: Option<&Version>,
) -> Result<HashMap<Address, HashMap<Address, AccountBalance>>, StorageError>;
}
pub trait Gateway:
ChainGateway
+ ContractStateGateway
+ ExtractionStateGateway
+ ProtocolGateway
+ ContractStateGateway
+ EntryPointGateway
+ Send
+ Sync
{
}