use anyhow::Result;
use async_std::net::TcpStream;
use async_std::sync::Arc;
use async_std::task;
use futures_lite::stream::StreamExt;
use log::*;
use std::collections::HashMap;
use std::convert::TryInto;
use std::env;
use hypercore_protocol::schema::*;
use hypercore_protocol::{
discovery_key, Channel, DiscoveryKey, Event, Key, Message, ProtocolBuilder,
};
mod util;
use util::{tcp_client, tcp_server};
fn usage() {
println!("usage: cargo run --example basic -- [client|server] [port] [key]");
std::process::exit(1);
}
fn main() {
util::init_logger();
if env::args().count() < 3 {
usage();
}
let mode = env::args().nth(1).unwrap();
let port = env::args().nth(2).unwrap();
let address = format!("127.0.0.1:{}", port);
let key = env::args().nth(3);
let key = key.map_or(None, |key| hex::decode(key).ok());
let key = key.map(|key| key.try_into().expect("Key must be a 32 byte hex string"));
let mut feedstore = FeedStore::new();
if let Some(key) = key {
feedstore.add(Feed::new(key));
} else {
let key = [9u8; 32];
feedstore.add(Feed::new(key.clone()));
println!("KEY={}", hex::encode(&key));
}
let feedstore = Arc::new(feedstore);
task::block_on(async move {
let result = match mode.as_ref() {
"server" => tcp_server(address, onconnection, feedstore).await,
"client" => tcp_client(address, onconnection, feedstore).await,
_ => panic!(usage()),
};
util::log_if_error(&result);
});
}
async fn onconnection(
stream: TcpStream,
is_initiator: bool,
feedstore: Arc<FeedStore>,
) -> Result<()> {
let mut protocol = ProtocolBuilder::new(is_initiator).connect(stream);
while let Some(event) = protocol.next().await {
let event = event?;
debug!("EVENT {:?}", event);
match event {
Event::Handshake(_) => {
if is_initiator {
for feed in feedstore.feeds.values() {
protocol.open(feed.key.clone()).await?;
}
}
}
Event::DiscoveryKey(dkey) => {
if let Some(feed) = feedstore.get(&dkey) {
protocol.open(feed.key.clone()).await?;
}
}
Event::Channel(mut channel) => {
if let Some(feed) = feedstore.get(channel.discovery_key()) {
let feed = feed.clone();
let mut state = FeedState::default();
task::spawn(async move {
while let Some(message) = channel.next().await {
onmessage(&*feed, &mut state, &mut channel, message).await;
}
});
}
}
_ => {}
}
}
Ok(())
}
struct FeedStore {
pub feeds: HashMap<String, Arc<Feed>>,
}
impl FeedStore {
pub fn new() -> Self {
let feeds = HashMap::new();
Self { feeds }
}
pub fn add(&mut self, feed: Feed) {
let hdkey = hex::encode(&feed.discovery_key);
self.feeds.insert(hdkey, Arc::new(feed));
}
pub fn get(&self, discovery_key: &[u8; 32]) -> Option<&Arc<Feed>> {
let hdkey = hex::encode(discovery_key);
self.feeds.get(&hdkey)
}
}
#[derive(Debug)]
struct Feed {
key: Key,
discovery_key: DiscoveryKey,
}
impl Feed {
pub fn new(key: Key) -> Self {
Feed {
discovery_key: discovery_key(&key),
key,
}
}
}
#[derive(Debug)]
struct FeedState {
remote_head: Option<u64>,
}
impl Default for FeedState {
fn default() -> Self {
FeedState { remote_head: None }
}
}
async fn onmessage(_feed: &Feed, state: &mut FeedState, channel: &mut Channel, message: Message) {
match message {
Message::Open(_) => {
let msg = Want {
start: 0,
length: None,
};
channel
.send(Message::Want(msg))
.await
.expect("failed to send");
}
Message::Want(_) => {
let msg = Have {
start: 0,
length: Some(3),
bitfield: None,
ack: None,
};
channel
.send(Message::Have(msg))
.await
.expect("failed to send");
}
Message::Have(msg) => {
let new_remote_head = msg.start + msg.length.unwrap_or(0);
if state.remote_head == None {
state.remote_head = Some(new_remote_head);
let msg = Request {
index: 0,
bytes: None,
hash: None,
nodes: None,
};
channel.send(Message::Request(msg)).await.unwrap();
} else if let Some(remote_head) = state.remote_head {
if remote_head < new_remote_head {
state.remote_head = Some(new_remote_head);
}
}
}
Message::Request(msg) => {
channel
.send(Message::Data(Data {
index: msg.index,
value: Some("Hello world".as_bytes().to_vec()),
nodes: vec![],
signature: None,
}))
.await
.unwrap();
}
Message::Data(msg) => {
debug!(
"receive data: idx {}, {} bytes (remote_head {:?})",
msg.index,
msg.value.as_ref().map_or(0, |v| v.len()),
state.remote_head
);
if let Some(value) = msg.value {
eprintln!("{} {}", msg.index, String::from_utf8(value).unwrap());
}
let next = msg.index + 1;
if let Some(remote_head) = state.remote_head {
if remote_head >= next {
let msg = Request {
index: next,
bytes: None,
hash: None,
nodes: None,
};
channel.send(Message::Request(msg)).await.unwrap();
}
}
}
_ => {}
}
}