use crate::codec::{encode_varint, MAX_VARINT_LENGTH};
use crate::{Application, Error, Result};
use bytes::BytesMut;
use std::collections::HashMap;
use std::sync::mpsc::{channel, Receiver, Sender};
use tendermint_proto::abci::{
Event, EventAttribute, RequestCheckTx, RequestDeliverTx, RequestInfo, RequestQuery,
ResponseCheckTx, ResponseCommit, ResponseDeliverTx, ResponseInfo, ResponseQuery,
};
use tracing::{debug, info};
#[derive(Debug, Clone)]
pub struct KeyValueStoreApp {
cmd_tx: Sender<Command>,
}
impl KeyValueStoreApp {
pub fn new() -> (Self, KeyValueStoreDriver) {
let (cmd_tx, cmd_rx) = channel();
(Self { cmd_tx }, KeyValueStoreDriver::new(cmd_rx))
}
pub fn get<K: AsRef<str>>(&self, key: K) -> Result<(i64, Option<String>)> {
let (result_tx, result_rx) = channel();
channel_send(
&self.cmd_tx,
Command::Get {
key: key.as_ref().to_string(),
result_tx,
},
)?;
channel_recv(&result_rx)
}
pub fn set<K, V>(&self, key: K, value: V) -> Result<Option<String>>
where
K: AsRef<str>,
V: AsRef<str>,
{
let (result_tx, result_rx) = channel();
channel_send(
&self.cmd_tx,
Command::Set {
key: key.as_ref().to_string(),
value: value.as_ref().to_string(),
result_tx,
},
)?;
channel_recv(&result_rx)
}
}
impl Application for KeyValueStoreApp {
fn info(&self, request: RequestInfo) -> ResponseInfo {
debug!(
"Got info request. Tendermint version: {}; Block version: {}; P2P version: {}",
request.version, request.block_version, request.p2p_version
);
let (result_tx, result_rx) = channel();
channel_send(&self.cmd_tx, Command::GetInfo { result_tx }).unwrap();
let (last_block_height, last_block_app_hash) = channel_recv(&result_rx).unwrap();
ResponseInfo {
data: "kvstore-rs".to_string(),
version: "0.1.0".to_string(),
app_version: 1,
last_block_height,
last_block_app_hash,
}
}
fn query(&self, request: RequestQuery) -> ResponseQuery {
let key = match String::from_utf8(request.data.clone()) {
Ok(s) => s,
Err(e) => panic!("Failed to intepret key as UTF-8: {}", e),
};
debug!("Attempting to get key: {}", key);
match self.get(key.clone()) {
Ok((height, value_opt)) => match value_opt {
Some(value) => ResponseQuery {
code: 0,
log: "exists".to_string(),
info: "".to_string(),
index: 0,
key: request.data,
value: value.into_bytes(),
proof_ops: None,
height,
codespace: "".to_string(),
},
None => ResponseQuery {
code: 0,
log: "does not exist".to_string(),
info: "".to_string(),
index: 0,
key: request.data,
value: vec![],
proof_ops: None,
height,
codespace: "".to_string(),
},
},
Err(e) => panic!("Failed to get key \"{}\": {:?}", key, e),
}
}
fn check_tx(&self, _request: RequestCheckTx) -> ResponseCheckTx {
ResponseCheckTx {
code: 0,
data: vec![],
log: "".to_string(),
info: "".to_string(),
gas_wanted: 1,
gas_used: 0,
events: vec![],
codespace: "".to_string(),
}
}
fn deliver_tx(&self, request: RequestDeliverTx) -> ResponseDeliverTx {
let tx = String::from_utf8(request.tx).unwrap();
let tx_parts = tx.split('=').collect::<Vec<&str>>();
let (key, value) = if tx_parts.len() == 2 {
(tx_parts[0], tx_parts[1])
} else {
(tx.as_ref(), tx.as_ref())
};
let _ = self.set(key, value).unwrap();
ResponseDeliverTx {
code: 0,
data: vec![],
log: "".to_string(),
info: "".to_string(),
gas_wanted: 0,
gas_used: 0,
events: vec![Event {
r#type: "app".to_string(),
attributes: vec![
EventAttribute {
key: "key".as_bytes().to_owned(),
value: key.as_bytes().to_owned(),
index: true,
},
EventAttribute {
key: "index_key".as_bytes().to_owned(),
value: "index is working".as_bytes().to_owned(),
index: true,
},
EventAttribute {
key: "noindex_key".as_bytes().to_owned(),
value: "index is working".as_bytes().to_owned(),
index: false,
},
],
}],
codespace: "".to_string(),
}
}
fn commit(&self) -> ResponseCommit {
let (result_tx, result_rx) = channel();
channel_send(&self.cmd_tx, Command::Commit { result_tx }).unwrap();
let (height, app_hash) = channel_recv(&result_rx).unwrap();
info!("Committed height {}", height);
ResponseCommit {
data: app_hash,
retain_height: height - 1,
}
}
}
#[derive(Debug)]
pub struct KeyValueStoreDriver {
store: HashMap<String, String>,
height: i64,
app_hash: Vec<u8>,
cmd_rx: Receiver<Command>,
}
impl KeyValueStoreDriver {
fn new(cmd_rx: Receiver<Command>) -> Self {
Self {
store: HashMap::new(),
height: 0,
app_hash: vec![0_u8; MAX_VARINT_LENGTH],
cmd_rx,
}
}
pub fn run(mut self) -> Result<()> {
loop {
let cmd = self
.cmd_rx
.recv()
.map_err(|e| Error::ChannelRecv(e.to_string()))?;
match cmd {
Command::GetInfo { result_tx } => {
channel_send(&result_tx, (self.height, self.app_hash.clone()))?
}
Command::Get { key, result_tx } => {
debug!("Getting value for \"{}\"", key);
channel_send(
&result_tx,
(self.height, self.store.get(&key).map(Clone::clone)),
)?;
}
Command::Set {
key,
value,
result_tx,
} => {
debug!("Setting \"{}\" = \"{}\"", key, value);
channel_send(&result_tx, self.store.insert(key, value))?;
}
Command::Commit { result_tx } => self.commit(result_tx)?,
}
}
}
fn commit(&mut self, result_tx: Sender<(i64, Vec<u8>)>) -> Result<()> {
let mut app_hash = BytesMut::with_capacity(MAX_VARINT_LENGTH);
encode_varint(self.store.len() as u64, &mut app_hash);
self.app_hash = app_hash.to_vec();
self.height += 1;
channel_send(&result_tx, (self.height, self.app_hash.clone()))
}
}
#[derive(Debug, Clone)]
enum Command {
GetInfo { result_tx: Sender<(i64, Vec<u8>)> },
Get {
key: String,
result_tx: Sender<(i64, Option<String>)>,
},
Set {
key: String,
value: String,
result_tx: Sender<Option<String>>,
},
Commit { result_tx: Sender<(i64, Vec<u8>)> },
}
fn channel_send<T>(tx: &Sender<T>, value: T) -> Result<()> {
tx.send(value)
.map_err(|e| Error::ChannelSend(e.to_string()).into())
}
fn channel_recv<T>(rx: &Receiver<T>) -> Result<T> {
rx.recv()
.map_err(|e| Error::ChannelRecv(e.to_string()).into())
}