use std::{
collections::BTreeMap,
fmt::{self, Display},
pin::Pin,
sync::{
mpsc::{Receiver, Sender},
Arc, Mutex,
},
};
use bytes::Bytes;
use ethers_core::types::{Block, BlockId, Transaction, H256};
use eyre::Result;
use crate::errors::ChannelManagerError;
#[derive(Debug, Default)]
pub struct ChannelManager {
blocks: Vec<Block<Transaction>>,
tip: Option<H256>,
sender: Option<Sender<Pin<Box<Bytes>>>>,
receiver: Option<Receiver<Pin<Box<BlockId>>>>,
pending_txs: BTreeMap<TransactionID, Bytes>,
confirmed_txs: BTreeMap<TransactionID, BlockId>,
}
#[derive(Debug, Clone, Hash, PartialEq, PartialOrd)]
pub struct PendingChannel {}
impl ChannelManager {
pub fn new() -> Self {
Self::default()
}
pub fn with_sender(&mut self, sender: Sender<Pin<Box<Bytes>>>) -> &mut Self {
self.sender = Some(sender);
self
}
pub fn with_receiver(&mut self, receiver: Receiver<Pin<Box<BlockId>>>) -> &mut Self {
self.receiver = Some(receiver);
self
}
pub fn tx_data(block_id: BlockId) -> Result<(Bytes, TransactionID)> {
tracing::debug!(target: "archon::channels", "channel manager constructing tx data with block id: {:?}...", block_id);
Err(ChannelManagerError::NotImplemented.into())
}
pub async fn execute(
receiver: Arc<Mutex<Receiver<Pin<Box<BlockId>>>>>,
sender: Arc<Mutex<Sender<Pin<Box<Bytes>>>>>,
) -> Result<()> {
let mut pending_txs = BTreeMap::new();
loop {
let locked_receiver = receiver
.lock()
.map_err(|_| ChannelManagerError::ReceiverLock)?;
let block_id = locked_receiver
.recv()
.map_err(|_| ChannelManagerError::ChannelClosed)?;
let (tx_data, tx_id) = Self::tx_data(*block_id)?;
let locked_sender = sender.lock().map_err(|_| ChannelManagerError::SenderLock)?;
locked_sender.send(Box::pin(tx_data.clone()))?;
pending_txs.insert(tx_id, tx_data);
}
}
pub fn spawn(self) -> Result<tokio::task::JoinHandle<Result<()>>> {
let receiver = self
.receiver
.ok_or(eyre::eyre!("ChannelManager missing receiver!"))?;
let receiver = Arc::new(Mutex::new(receiver));
let sender = self
.sender
.ok_or(eyre::eyre!("ChannelManager missing sender!"))?;
let sender = Arc::new(Mutex::new(sender));
let channel_manager_handle = tokio::spawn(async move {
tracing::info!(target: "archon::channels", "Spawning channel manager in new thread...");
ChannelManager::execute(receiver, sender).await
});
Ok(channel_manager_handle)
}
pub fn clear(&mut self) -> Result<()> {
self.blocks.clear();
self.tip = None;
self.clear_pending_channels()?;
Ok(())
}
pub fn clear_pending_channels(&mut self) -> Result<()> {
self.pending_txs.clear();
self.confirmed_txs.clear();
Ok(())
}
pub fn construct_pending_channel(&self) -> Result<PendingChannel> {
Err(ChannelManagerError::NotImplemented.into())
}
pub fn push_l2_block(&mut self, block: Block<Transaction>) -> Result<()> {
if self.tip.is_some() && self.tip != Some(block.parent_hash) {
return Err(ChannelManagerError::L1Reorg.into());
}
self.tip = block.hash;
self.blocks.push(block);
Ok(())
}
}
#[derive(Debug, Hash, Clone, PartialEq, PartialOrd, Ord, Eq)]
pub struct TransactionID {
channel_id: String,
frame_number: u64,
}
impl Default for TransactionID {
fn default() -> Self {
Self {
channel_id: String::from("0:0"),
frame_number: 0,
}
}
}
impl Display for TransactionID {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{}:{}", self.channel_id, self.frame_number)
}
}
#[derive(Debug, Hash, Clone, PartialEq, PartialOrd, Ord, Eq)]
pub struct TaggedData {
data: Bytes,
id: TransactionID,
}