gcache 0.0.1

A cache group to accurate remote data access
Documentation
use std::collections::HashMap;
use std::os::unix::ffi::OsStrExt;
use std::path::{Path, PathBuf};
use std::sync::Arc;

use bytes::Bytes;
use nix::errno::Errno;
use tokio::io::{self, AsyncReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::net::tcp::OwnedWriteHalf;
use tokio::net::{TcpListener, TcpStream};
use tokio::sync::{Mutex, RwLock};
use tracing::error;

use crate::{CacheStore, ConsistentHash, DataSource, Operation, Peer};
pub struct Node<S, C>(Arc<NodeInner<S, C>>);

struct NodeInner<S, C> {
    peer_addrs: Vec<String>,
    local_index: usize,
    peers: RwLock<HashMap<String, Peer<Vec<u8>>>>,
    hash: RwLock<ConsistentHash<String>>,
    data_source: S,
    cache_store: C,
    listener: TcpListener,
}

impl<S, C> Node<S, C>
where
    S: 'static + DataSource<Path, Err = io::Error>,
    C: 'static + CacheStore<Path, Err = io::Error>,
{
    pub async fn new(
        peer_addrs: Vec<String>,
        local_index: usize,
        data_source: S,
        cache_store: C,
    ) -> anyhow::Result<Self> {
        let node = match peer_addrs.get(local_index) {
            None => return Err(anyhow::anyhow!("local index out of range")),
            Some(addr) => {
                let listener = TcpListener::bind(addr).await?;
                Self(Arc::new(NodeInner {
                    peer_addrs,
                    local_index,
                    peers: Default::default(),
                    hash: RwLock::new(ConsistentHash::new()),
                    data_source,
                    cache_store,
                    listener,
                }))
            }
        };
        let mirror = node.clone();
        let _ = tokio::spawn(async move {
            if let Err(err) = mirror.serve().await {
                error!("node serve: {}", err)
            }
        });
        Ok(node)
    }

    pub async fn get(
        &self,
        key: impl AsRef<Path>,
        offset: u64,
        limit: u64,
    ) -> anyhow::Result<Bytes> {
        let p = key.as_ref();
        let node_name = self.0.hash.read().await.pick(p).clone();
        let mut data = Vec::with_capacity(limit as usize);
        if node_name == self.0.peer_addrs[self.0.local_index] {
            let mut r = self.get_local(p, offset, limit).await?;
            io::copy(&mut r, &mut data).await?;
            return Ok(data.into());
        }
        let peer = self.0.peers.read().await;
        let node = match peer.get(&node_name) {
            None => return Err(anyhow::anyhow!("node({}) not found", &node_name)),
            Some(pe) => pe,
        };
        let resp = node
            .send_request(
                Operation::Read {
                    key: p.as_os_str().as_bytes().to_owned().into(),
                    offset,
                    limit,
                },
                data,
            )
            .await?;
        if resp.status != 0 {
            Err(anyhow::anyhow!(
                "cannot read {:?}: {}",
                p,
                Errno::from_i32(resp.status).desc()
            ))
        } else {
            Ok(resp.body.into())
        }
    }

    pub async fn start(&self) -> anyhow::Result<()> {
        let mut peers = self.0.peers.write().await;
        for (i, addr) in self.0.peer_addrs.iter().enumerate() {
            if i == self.0.local_index {
                continue;
            }
            peers.insert(addr.into(), Peer::new(addr).await?);
        }
        drop(peers);
        let mut hash = self.0.hash.write().await;
        for addr in self.0.peer_addrs.iter() {
            hash.add(addr.into(), 64)
        }
        Ok(())
    }

    async fn serve(self) -> io::Result<()> {
        loop {
            let (stream, _) = self.0.listener.accept().await?;
            let mirror = self.clone();
            let _ = tokio::spawn(async move {
                if let Err(err) = mirror.handle(stream).await {
                    error!("handle peer request: {}", err);
                }
            });
        }
    }

    async fn handle(self, stream: TcpStream) -> io::Result<()> {
        use Operation::*;
        let (reader, writer) = stream.into_split();
        let shared_writer = Arc::new(Mutex::new(BufWriter::new(writer)));
        let mut buf_reader = BufReader::new(reader);
        loop {
            let id = buf_reader.read_u64().await?;
            let op = Operation::decode(&mut buf_reader).await?;
            match op {
                Ping => {
                    let mut w = shared_writer.lock().await;
                    w.write_u64(id).await?;
                    w.write_i32(0).await?;
                    w.write_u64(0).await?;
                    continue;
                }
                Read { key, offset, limit } => {
                    let s = self.clone();
                    let w = shared_writer.clone();
                    _ = tokio::spawn(async move {
                        if let Err(err) = s.handle_read(w, id, key, offset, limit).await {
                            error!("handle read: {}", err)
                        }
                    })
                }
                Evict(key) => {
                    let s = self.clone();
                    let w = shared_writer.clone();
                    _ = tokio::spawn(async move {
                        if let Err(err) = s.handle_evict(w, id, key).await {
                            error!("handle evict: {}", err)
                        }
                    })
                }
            }
        }
    }

    async fn handle_read(
        self,
        writer: Arc<Mutex<BufWriter<OwnedWriteHalf>>>,
        id: u64,
        key: Bytes,
        offset: u64,
        limit: u64,
    ) -> anyhow::Result<()> {
        let p = String::from_utf8_lossy(&key).parse::<PathBuf>()?;
        let mut reader = self.get_local(&p, offset, limit).await?;
        let mut w = writer.lock().await;
        w.write_u64(id).await?;
        w.write_i32(0).await?;
        w.write_u64(limit).await?;
        io::copy(&mut reader, &mut *w).await?;
        Ok(())
    }

    async fn get_local(&self, p: &Path, offset: u64, limit: u64) -> anyhow::Result<C::Reader> {
        let mut reader = self.0.cache_store.get_reader(&p, offset, limit).await?;
        if reader.is_none() {
            let mut r = self.0.data_source.get(&p).await?;
            self.0.cache_store.fill(&p, &mut r).await?;
            reader = self.0.cache_store.get_reader(&p, offset, limit).await?;
        }
        reader.ok_or(anyhow::anyhow!("cannot find {:?}", p))
    }

    async fn handle_evict(
        self,
        writer: Arc<Mutex<BufWriter<OwnedWriteHalf>>>,
        id: u64,
        key: Bytes,
    ) -> anyhow::Result<()> {
        let p = String::from_utf8_lossy(&key).parse::<PathBuf>()?;
        self.0.cache_store.evict(&p).await?;
        let mut w = writer.lock().await;
        w.write_u64(id).await?;
        w.write_i32(0).await?;
        w.write_u64(0).await?;
        Ok(())
    }
}

impl<S, C> Clone for Node<S, C> {
    fn clone(&self) -> Self {
        Self(self.0.clone())
    }
}

#[cfg(test)]
mod tests {
    use std::path::PathBuf;
    use std::time::Duration;

    use tokio::fs::OpenOptions;
    use tokio::io::AsyncWriteExt;
    use tokio::{fs, io};

    use crate::source::file::FileSource;
    use crate::store::file::FileStore;
    use crate::Node;

    #[tokio::test]
    async fn test_node() -> anyhow::Result<()> {
        tracing_subscriber::fmt::init();
        let base = "test-dir/".parse::<PathBuf>().unwrap();
        match fs::metadata(&base).await {
            Err(e) if e.kind() == io::ErrorKind::NotFound => {
                fs::create_dir(&base).await?;
            }
            Err(e) => return Err(e.into()),
            _ => {}
        }
        let origin = base.join("origin");
        match fs::metadata(&origin).await {
            Err(e) if e.kind() == io::ErrorKind::NotFound => {
                fs::create_dir(&origin).await?;
            }
            Err(e) => return Err(e.into()),
            _ => {}
        }
        for i in 0..10000 {
            let p = origin.join(format!("{}", i));
            let mut f = OpenOptions::new()
                .write(true)
                .truncate(true)
                .create(true)
                .open(p)
                .await?;

            f.write_all(format!("{}", i).repeat(1 << 10).as_bytes())
                .await?;
        }
        let source = FileSource::new(&origin);
        let addrs = [
            "127.0.0.1:8180",
            "127.0.0.1:8181",
            "127.0.0.1:8182",
            "127.0.0.1:8183",
        ]
        .iter()
        .map(|a| a.to_string())
        .collect::<Vec<_>>();
        let mut nodes = Vec::new();
        for i in 0..4 {
            let cache_dir = base.join(format!("cache{}", i));
            match fs::metadata(&cache_dir).await {
                Err(e) if e.kind() == io::ErrorKind::NotFound => {}
                _ => {
                    fs::remove_dir_all(&cache_dir).await?;
                }
            }
            fs::create_dir(&cache_dir).await?;
            let cache_store = FileStore::new(cache_dir);
            let node = Node::new(addrs.clone(), i, source.clone(), cache_store).await?;
            nodes.push(node);
        }
        tokio::time::sleep(Duration::from_secs(1)).await;
        for n in &nodes {
            n.start().await?;
        }
        let node = &mut nodes[0];
        let mut entries = fs::read_dir(&origin).await?;
        while let Some(e) = entries.next_entry().await? {
            if e.file_type().await?.is_file() {
                let len = e.metadata().await?.len();
                let offset = len / 2;
                let limit = (len - offset) / 2;
                let ep = e.path();
                let key = ep.strip_prefix(&origin)?;
                let data = node.get(key, offset, limit).await?;
                let expect = fs::read(ep).await?;
                assert_eq!(
                    data,
                    expect.as_slice()[offset as usize..(offset + limit) as usize]
                );
            }
        }
        let mut total_count = 0;
        for i in 0..4 {
            let mut count = 0;
            let cache_dir = base.join(format!("cache{}", i));
            let mut entries = fs::read_dir(&cache_dir).await?;
            while let Ok(Some(_)) = entries.next_entry().await {
                count += 1;
            }
            assert!(count > 2200, "count cache{}: {}", i, count);
            assert!(count < 2800, "count cache{}: {}", i, count);
            total_count += count;
        }
        assert_eq!(total_count, 10000, "total count: {}", total_count);
        Ok(())
    }
}