use std::collections::HashMap;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use bytes::Bytes;
use nix::errno::Errno;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, RwLock};
use tracing::error;
use crate::{CacheStore, ConsistentHash, DataSource, Operation, Peer};
pub struct Node<S, C>(Arc<NodeInner<S, C>>);
struct NodeInner<S, C> {
peer_addrs: Vec<String>,
local_index: usize,
peers: RwLock<HashMap<String, Peer<Vec<u8>>>>,
hash: RwLock<ConsistentHash<String>>,
data_source: S,
cache_store: C,
listener: TcpListener,
}
impl<S, C> Node<S, C>
where
S: 'static + DataSource<Path, Err = io::Error>,
C: 'static + CacheStore<Path, Err = io::Error>,
{
pub async fn new(
peer_addrs: Vec<String>,
local_index: usize,
data_source: S,
cache_store: C,
) -> anyhow::Result<Self> {
let node = match peer_addrs.get(local_index) {
None => return Err(anyhow::anyhow!("local index out of range")),
Some(addr) => {
let listener = TcpListener::bind(addr).await?;
Self(Arc::new(NodeInner {
peer_addrs,
local_index,
peers: Default::default(),
hash: RwLock::new(ConsistentHash::new()),
data_source,
cache_store,
listener,
}))
}
};
let mirror = node.clone();
let _ = tokio::spawn(async move {
if let Err(err) = mirror.serve().await {
error!("node serve: {}", err)
}
});
Ok(node)
}
pub async fn get(
&self,
key: impl AsRef<Path>,
offset: u64,
limit: u64,
) -> anyhow::Result<Bytes> {
let p = key.as_ref();
let node_name = self.0.hash.read().await.pick(p).clone();
let mut data = Vec::with_capacity(limit as usize);
if node_name == self.0.peer_addrs[self.0.local_index] {
let mut r = self.get_local(p, offset, limit).await?;
io::copy(&mut r, &mut data).await?;
return Ok(data.into());
}
let peer = self.0.peers.read().await;
let node = match peer.get(&node_name) {
None => return Err(anyhow::anyhow!("node({}) not found", &node_name)),
Some(pe) => pe,
};
let resp = node
.send_request(
Operation::Read {
key: p.as_os_str().as_bytes().to_owned().into(),
offset,
limit,
},
data,
)
.await?;
if resp.status != 0 {
Err(anyhow::anyhow!(
"cannot read {:?}: {}",
p,
Errno::from_i32(resp.status).desc()
))
} else {
Ok(resp.body.into())
}
}
pub async fn start(&self) -> anyhow::Result<()> {
let mut peers = self.0.peers.write().await;
for (i, addr) in self.0.peer_addrs.iter().enumerate() {
if i == self.0.local_index {
continue;
}
peers.insert(addr.into(), Peer::new(addr).await?);
}
drop(peers);
let mut hash = self.0.hash.write().await;
for addr in self.0.peer_addrs.iter() {
hash.add(addr.into(), 64)
}
Ok(())
}
async fn serve(self) -> io::Result<()> {
loop {
let (stream, _) = self.0.listener.accept().await?;
let mirror = self.clone();
let _ = tokio::spawn(async move {
if let Err(err) = mirror.handle(stream).await {
error!("handle peer request: {}", err);
}
});
}
}
async fn handle(self, stream: TcpStream) -> io::Result<()> {
use Operation::*;
let (reader, writer) = stream.into_split();
let shared_writer = Arc::new(Mutex::new(BufWriter::new(writer)));
let mut buf_reader = BufReader::new(reader);
loop {
let id = buf_reader.read_u64().await?;
let op = Operation::decode(&mut buf_reader).await?;
match op {
Ping => {
let mut w = shared_writer.lock().await;
w.write_u64(id).await?;
w.write_i32(0).await?;
w.write_u64(0).await?;
continue;
}
Read { key, offset, limit } => {
let s = self.clone();
let w = shared_writer.clone();
_ = tokio::spawn(async move {
if let Err(err) = s.handle_read(w, id, key, offset, limit).await {
error!("handle read: {}", err)
}
})
}
Evict(key) => {
let s = self.clone();
let w = shared_writer.clone();
_ = tokio::spawn(async move {
if let Err(err) = s.handle_evict(w, id, key).await {
error!("handle evict: {}", err)
}
})
}
}
}
}
async fn handle_read(
self,
writer: Arc<Mutex<BufWriter<OwnedWriteHalf>>>,
id: u64,
key: Bytes,
offset: u64,
limit: u64,
) -> anyhow::Result<()> {
let p = String::from_utf8_lossy(&key).parse::<PathBuf>()?;
let mut reader = self.get_local(&p, offset, limit).await?;
let mut w = writer.lock().await;
w.write_u64(id).await?;
w.write_i32(0).await?;
w.write_u64(limit).await?;
io::copy(&mut reader, &mut *w).await?;
Ok(())
}
async fn get_local(&self, p: &Path, offset: u64, limit: u64) -> anyhow::Result<C::Reader> {
let mut reader = self.0.cache_store.get_reader(&p, offset, limit).await?;
if reader.is_none() {
let mut r = self.0.data_source.get(&p).await?;
self.0.cache_store.fill(&p, &mut r).await?;
reader = self.0.cache_store.get_reader(&p, offset, limit).await?;
}
reader.ok_or(anyhow::anyhow!("cannot find {:?}", p))
}
async fn handle_evict(
self,
writer: Arc<Mutex<BufWriter<OwnedWriteHalf>>>,
id: u64,
key: Bytes,
) -> anyhow::Result<()> {
let p = String::from_utf8_lossy(&key).parse::<PathBuf>()?;
self.0.cache_store.evict(&p).await?;
let mut w = writer.lock().await;
w.write_u64(id).await?;
w.write_i32(0).await?;
w.write_u64(0).await?;
Ok(())
}
}
impl<S, C> Clone for Node<S, C> {
fn clone(&self) -> Self {
Self(self.0.clone())
}
}
#[cfg(test)]
mod tests {
use std::path::PathBuf;
use std::time::Duration;
use tokio::fs::OpenOptions;
use tokio::io::AsyncWriteExt;
use tokio::{fs, io};
use crate::source::file::FileSource;
use crate::store::file::FileStore;
use crate::Node;
#[tokio::test]
async fn test_node() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let base = "test-dir/".parse::<PathBuf>().unwrap();
match fs::metadata(&base).await {
Err(e) if e.kind() == io::ErrorKind::NotFound => {
fs::create_dir(&base).await?;
}
Err(e) => return Err(e.into()),
_ => {}
}
let origin = base.join("origin");
match fs::metadata(&origin).await {
Err(e) if e.kind() == io::ErrorKind::NotFound => {
fs::create_dir(&origin).await?;
}
Err(e) => return Err(e.into()),
_ => {}
}
for i in 0..10000 {
let p = origin.join(format!("{}", i));
let mut f = OpenOptions::new()
.write(true)
.truncate(true)
.create(true)
.open(p)
.await?;
f.write_all(format!("{}", i).repeat(1 << 10).as_bytes())
.await?;
}
let source = FileSource::new(&origin);
let addrs = [
"127.0.0.1:8180",
"127.0.0.1:8181",
"127.0.0.1:8182",
"127.0.0.1:8183",
]
.iter()
.map(|a| a.to_string())
.collect::<Vec<_>>();
let mut nodes = Vec::new();
for i in 0..4 {
let cache_dir = base.join(format!("cache{}", i));
match fs::metadata(&cache_dir).await {
Err(e) if e.kind() == io::ErrorKind::NotFound => {}
_ => {
fs::remove_dir_all(&cache_dir).await?;
}
}
fs::create_dir(&cache_dir).await?;
let cache_store = FileStore::new(cache_dir);
let node = Node::new(addrs.clone(), i, source.clone(), cache_store).await?;
nodes.push(node);
}
tokio::time::sleep(Duration::from_secs(1)).await;
for n in &nodes {
n.start().await?;
}
let node = &mut nodes[0];
let mut entries = fs::read_dir(&origin).await?;
while let Some(e) = entries.next_entry().await? {
if e.file_type().await?.is_file() {
let len = e.metadata().await?.len();
let offset = len / 2;
let limit = (len - offset) / 2;
let ep = e.path();
let key = ep.strip_prefix(&origin)?;
let data = node.get(key, offset, limit).await?;
let expect = fs::read(ep).await?;
assert_eq!(
data,
expect.as_slice()[offset as usize..(offset + limit) as usize]
);
}
}
let mut total_count = 0;
for i in 0..4 {
let mut count = 0;
let cache_dir = base.join(format!("cache{}", i));
let mut entries = fs::read_dir(&cache_dir).await?;
while let Ok(Some(_)) = entries.next_entry().await {
count += 1;
}
assert!(count > 2200, "count cache{}: {}", i, count);
assert!(count < 2800, "count cache{}: {}", i, count);
total_count += count;
}
assert_eq!(total_count, 10000, "total count: {}", total_count);
Ok(())
}
}