use async_trait::async_trait;
use botcore::engine::Engine;
use botcore::types::{Collector, CollectorStream, Executor, Strategy};
use botcore::Result;
use std::time::Duration;
use tokio::time;
use tracing::{error, info};
#[derive(Debug, Clone)]
struct BlockEvent {
number: u64,
timestamp: u64,
}
#[derive(Debug, Clone)]
struct TradeAction {
block_number: u64,
amount: u64,
timestamp: u64,
}
struct BlockCollector {
interval: Duration,
}
impl BlockCollector {
fn new(interval: Duration) -> Self {
Self { interval }
}
}
#[async_trait]
impl Collector<BlockEvent> for BlockCollector {
async fn get_event_stream(&self) -> Result<CollectorStream<'_, BlockEvent>> {
let interval = self.interval;
let stream = tokio_stream::StreamExt::map(
tokio_stream::wrappers::IntervalStream::new(tokio::time::interval(interval)),
|_| {
static mut BLOCK: u64 = 0;
unsafe {
BLOCK += 1;
BlockEvent {
number: BLOCK,
timestamp: std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs(),
}
}
},
);
Ok(Box::pin(stream))
}
}
struct TradeStrategy {
block_interval: u64,
trade_amount: u64,
last_trade_block: u64,
}
impl TradeStrategy {
fn new(block_interval: u64, trade_amount: u64) -> Self {
Self {
block_interval,
trade_amount,
last_trade_block: 0,
}
}
}
#[async_trait]
impl Strategy<BlockEvent, TradeAction> for TradeStrategy {
async fn sync_state(&mut self) -> Result<()> {
Ok(())
}
async fn process_event(&mut self, event: BlockEvent) -> Vec<TradeAction> {
if event.number >= self.last_trade_block + self.block_interval {
self.last_trade_block = event.number;
vec![TradeAction {
block_number: event.number,
amount: self.trade_amount,
timestamp: event.timestamp,
}]
} else {
vec![]
}
}
}
struct TradeExecutor;
#[async_trait]
impl Executor<TradeAction> for TradeExecutor {
async fn execute(&self, action: TradeAction) -> Result<()> {
println!(
"Executing trade of {} units at block {} at {}",
action.amount, action.block_number, action.timestamp
);
Ok(())
}
}
#[tokio::main]
async fn main() -> Result<()> {
let mut engine = Engine::new()
.with_event_channel_capacity(1024)
.with_action_channel_capacity(1024);
engine.add_collector(Box::new(BlockCollector::new(Duration::from_secs(1))));
engine.add_strategy(Box::new(TradeStrategy::new(5, 100)));
engine.add_executor(Box::new(TradeExecutor));
match engine.run().await {
Ok(mut set) => {
while let Some(res) = set.join_next().await {
match res {
Ok(res) => {
info!("res: {:?}", res);
}
Err(e) => {
info!("error: {:?}", e);
}
}
}
}
Err(e) => {
error!("Engine run error: {:?}", e);
}
}
time::sleep(Duration::from_secs(30)).await;
Ok(())
}