use crate::error::Error;
use crate::repo::paths::{block_path, filestem_to_block_cid};
use crate::repo::{BlockPut, BlockStore};
use crate::Block;
use async_trait::async_trait;
use futures::stream::{self, BoxStream};
use futures::{StreamExt, TryFutureExt, TryStreamExt};
use libipld::Cid;
use std::io::{self, ErrorKind, Read};
use std::path::PathBuf;
use std::sync::Arc;
use tokio::fs;
use tokio::sync::RwLock;
use tokio_stream::wrappers::ReadDirStream;
#[derive(Debug)]
pub struct FsBlockStore {
    inner: Arc<RwLock<FsBlockStoreInner>>,
}
#[derive(Debug)]
struct FsBlockStoreInner {
    path: PathBuf,
}
impl FsBlockStore {
    pub fn new(path: PathBuf) -> Self {
        let inner = Arc::new(RwLock::new(FsBlockStoreInner { path }));
        FsBlockStore { inner }
    }
}
#[async_trait]
impl BlockStore for FsBlockStore {
    async fn init(&self) -> Result<(), Error> {
        let inner = &*self.inner.read().await;
        fs::create_dir_all(inner.path.clone()).await?;
        Ok(())
    }
    async fn open(&self) -> Result<(), Error> {
        Ok(())
    }
    async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
        let inner = &*self.inner.read().await;
        inner.contains(cid).await
    }
    async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
        let inner = &*self.inner.read().await;
        inner.get(cid).await
    }
    async fn size(&self, cid: &[Cid]) -> Result<Option<usize>, Error> {
        let inner = &*self.inner.read().await;
        Ok(inner.size(cid).await)
    }
    async fn total_size(&self) -> Result<usize, Error> {
        let inner = &*self.inner.read().await;
        Ok(inner.total_size().await)
    }
    async fn put(&self, block: Block) -> Result<(Cid, BlockPut), Error> {
        let inner = &mut *self.inner.write().await;
        inner.put(block).await
    }
    async fn remove(&self, cid: &Cid) -> Result<(), Error> {
        let inner = &mut *self.inner.write().await;
        inner.remove(cid).await
    }
    async fn remove_many(&self, blocks: BoxStream<'static, Cid>) -> BoxStream<'static, Cid> {
        let inner = self.inner.clone();
        let stream = async_stream::stream! {
            let inner = &mut *inner.write().await;
            let path = inner.path.clone();
            for await cid in blocks
                .map(move |cid| (cid, block_path(path.clone(), &cid)))
                .filter_map(|(cid, path)| async move { fs::remove_file(path).await.ok().map(|_| cid) }) {
                    yield cid;
                }
        };
        stream.boxed()
    }
    async fn list(&self) -> BoxStream<'static, Cid> {
        let inner = &*self.inner.read().await;
        inner.list().await
    }
}
impl FsBlockStoreInner {
    async fn contains(&self, cid: &Cid) -> Result<bool, Error> {
        let path = block_path(self.path.clone(), cid);
        let metadata = match fs::metadata(path).await {
            Ok(m) => m,
            Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(false),
            Err(e) => return Err(e.into()),
        };
        Ok(metadata.is_file())
    }
    async fn get(&self, cid: &Cid) -> Result<Option<Block>, Error> {
        let path = block_path(self.path.clone(), cid);
        let cid = *cid;
        tokio::task::spawn_blocking(move || {
            let mut file = match std::fs::File::open(path) {
                Ok(file) => file,
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => return Ok(None),
                Err(e) => {
                    return Err(e.into());
                }
            };
            let len = file.metadata()?.len();
            let mut data = Vec::with_capacity(len as usize);
            file.read_to_end(&mut data)?;
            let block = Block::new(cid, data)?;
            Ok(Some(block))
        })
        .await?
    }
    async fn put(&mut self, block: Block) -> Result<(Cid, BlockPut), Error> {
        let target_path = block_path(self.path.clone(), block.cid());
        let cid = *block.cid();
        let je = tokio::task::spawn_blocking(move || {
            let sharded = target_path
                .parent()
                .expect("we already have at least the shard parent");
            std::fs::create_dir_all(sharded)?;
            let target = std::fs::OpenOptions::new()
                .write(true)
                .create_new(true)
                .open(&target_path)?;
            let temp_path = target_path.with_extension("tmp");
            match write_through_tempfile(target, &target_path, temp_path, block.data()) {
                Ok(()) => {
                    trace!("successfully wrote the block");
                    Ok::<_, std::io::Error>(Ok(block.data().len()))
                }
                Err(e) => {
                    match std::fs::remove_file(&target_path) {
                        Ok(_) => debug!("removed partially written {:?}", target_path),
                        Err(removal) => warn!(
                            "failed to remove partially written {:?}: {}",
                            target_path, removal
                        ),
                    }
                    Ok(Err(e))
                }
            }
        })
        .await
        .map_err(|e| {
            error!("blocking put task error: {}", e);
            e
        })?;
        match je {
            Ok(Ok(written)) => {
                trace!(bytes = written, "block writing succeeded");
                Ok((cid, BlockPut::NewBlock))
            }
            Ok(Err(e)) => {
                trace!("write failed but hopefully the target was removed");
                Err(Error::new(e))
            }
            Err(e) if e.kind() == ErrorKind::AlreadyExists => {
                trace!("block exist: {}", e);
                Ok((cid, BlockPut::Existed))
            }
            Err(e) => Err(Error::new(e)),
        }
    }
    async fn size(&self, cids: &[Cid]) -> Option<usize> {
        let mut block_sizes = 0;
        for cid in cids {
            let path = block_path(self.path.clone(), cid);
            if let Ok(size) = fs::metadata(path).await.map(|m| m.len() as usize) {
                block_sizes += size;
            }
        }
        Some(block_sizes)
    }
    async fn total_size(&self) -> usize {
        self.list_stream()
            .and_then(|blocks| async move {
                let list = blocks
                    .try_filter_map(|(_, path)| async move {
                        let meta = fs::metadata(path).await?;
                        Ok(Some(meta.len()))
                    })
                    .try_collect::<Vec<_>>()
                    .await
                    .unwrap_or_default();
                Ok(list.into_iter().sum::<u64>() as usize)
            })
            .await
            .unwrap_or_default()
    }
    async fn remove(&mut self, cid: &Cid) -> Result<(), Error> {
        let path = block_path(self.path.clone(), cid);
        trace!(cid = %cid, "removing block after synchronizing");
        fs::remove_file(path).await.map_err(anyhow::Error::from)
    }
    async fn list_stream(
        &self,
    ) -> Result<BoxStream<'static, Result<(Cid, PathBuf), io::Error>>, Error> {
        let stream = ReadDirStream::new(fs::read_dir(&self.path).await?);
        let path = self.path.clone();
        Ok(stream
            .try_filter_map(|d| async move {
                Ok(if d.file_type().await?.is_dir() {
                    Some(ReadDirStream::new(fs::read_dir(d.path()).await?))
                } else {
                    None
                })
            })
            .try_flatten()
            .try_filter_map(|d| {
                let name = d.file_name();
                let path: &std::path::Path = name.as_ref();
                futures::future::ready(if path.extension() != Some("data".as_ref()) {
                    Ok(None)
                } else {
                    let maybe_cid = filestem_to_block_cid(path.file_stem());
                    Ok(maybe_cid)
                })
            })
            .try_filter_map(move |cid| {
                let path = path.clone();
                async move {
                    let path = block_path(path, &cid);
                    Ok(Some((cid, path)))
                }
            })
            .boxed())
    }
    async fn list(&self) -> BoxStream<'static, Cid> {
        let stream = self.list_stream().await.unwrap_or(stream::empty().boxed());
        stream
            .try_filter_map(|(cid, _)| futures::future::ready(Ok(Some(cid))))
            .filter_map(|cid| futures::future::ready(cid.ok()))
            .boxed()
    }
}
fn write_through_tempfile(
    target: std::fs::File,
    target_path: impl AsRef<std::path::Path>,
    temp_path: impl AsRef<std::path::Path>,
    data: &[u8],
) -> Result<(), std::io::Error> {
    use std::io::Write;
    let mut temp = std::fs::OpenOptions::new()
        .write(true)
        .create(true)
        .truncate(true)
        .open(&temp_path)?;
    temp.write_all(data)?;
    temp.flush()?;
    temp.sync_all()?;
    drop(temp);
    drop(target);
    std::fs::rename(temp_path, target_path)?;
    Ok(())
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::Block;
    use hex_literal::hex;
    use libipld::{
        multihash::{Code, MultihashDigest},
        Cid, IpldCodec,
    };
    use std::convert::TryFrom;
    use std::env::temp_dir;
    use std::sync::Arc;
    #[tokio::test]
    async fn test_fs_blockstore() {
        let mut tmp = temp_dir();
        tmp.push("blockstore1");
        std::fs::remove_dir_all(tmp.clone()).ok();
        let store = FsBlockStore::new(tmp.clone());
        let data = b"1".to_vec();
        let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data));
        let block = Block::new(cid, data).unwrap();
        store.init().await.unwrap();
        store.open().await.unwrap();
        let contains = store.contains(&cid).await.unwrap();
        assert!(!contains);
        let get = store.get(&cid).await.unwrap();
        assert_eq!(get, None);
        if store.remove(&cid).await.is_ok() {
            panic!("block should not be found")
        }
        let put = store.put(block.clone()).await.unwrap();
        assert_eq!(put.0, cid.to_owned());
        let contains = store.contains(&cid);
        assert!(contains.await.unwrap());
        let get = store.get(&cid);
        assert_eq!(get.await.unwrap(), Some(block.clone()));
        store.remove(&cid).await.unwrap();
        let contains = store.contains(&cid);
        assert!(!contains.await.unwrap());
        let get = store.get(&cid);
        assert_eq!(get.await.unwrap(), None);
        std::fs::remove_dir_all(tmp).ok();
    }
    #[tokio::test]
    async fn test_fs_blockstore_open() {
        let mut tmp = temp_dir();
        tmp.push("blockstore2");
        std::fs::remove_dir_all(&tmp).ok();
        let data = b"1".to_vec();
        let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data));
        let block = Block::new(cid, data).unwrap();
        let block_store = FsBlockStore::new(tmp.clone());
        block_store.init().await.unwrap();
        block_store.open().await.unwrap();
        assert!(!block_store.contains(block.cid()).await.unwrap());
        block_store.put(block.clone()).await.unwrap();
        let block_store = FsBlockStore::new(tmp.clone());
        block_store.open().await.unwrap();
        assert!(block_store.contains(block.cid()).await.unwrap());
        assert_eq!(block_store.get(block.cid()).await.unwrap().unwrap(), block);
        std::fs::remove_dir_all(&tmp).ok();
    }
    #[tokio::test]
    async fn test_fs_blockstore_list() {
        let mut tmp = temp_dir();
        tmp.push("blockstore_list");
        std::fs::remove_dir_all(&tmp).ok();
        let block_store = FsBlockStore::new(tmp.clone());
        block_store.init().await.unwrap();
        block_store.open().await.unwrap();
        for data in &[b"1", b"2", b"3"] {
            let data_slice = data.to_vec();
            let cid = Cid::new_v1(IpldCodec::Raw.into(), Code::Sha2_256.digest(&data_slice));
            let block = Block::new(cid, data_slice).unwrap();
            block_store.put(block.clone()).await.unwrap();
        }
        let cids = block_store.list().await.collect::<Vec<_>>().await;
        assert_eq!(cids.len(), 3);
        for cid in cids.iter() {
            assert!(block_store.contains(cid).await.unwrap());
        }
    }
    #[tokio::test]
    async fn race_to_insert_new() {
        let mut tmp = temp_dir();
        tmp.push("race_to_insert_new");
        std::fs::remove_dir_all(&tmp).ok();
        let single = FsBlockStore::new(tmp.clone());
        single.init().await.unwrap();
        let single = Arc::new(single);
        let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
        let data = hex!("0a0d08021207666f6f6261720a1807");
        let block = Block::new(cid, data.into()).unwrap();
        let count = 10;
        let (writes, existing) = race_to_insert_scenario(count, block, &single).await;
        assert_eq!(writes, 1);
        assert_eq!(existing, count - 1);
    }
    async fn race_to_insert_scenario(
        count: usize,
        block: Block,
        blockstore: &Arc<FsBlockStore>,
    ) -> (usize, usize) {
        let barrier = Arc::new(tokio::sync::Barrier::new(count));
        let join_handles = (0..count)
            .map(|_| {
                tokio::spawn({
                    let bs = Arc::clone(blockstore);
                    let barrier = Arc::clone(&barrier);
                    let block = block.clone();
                    async move {
                        barrier.wait().await;
                        bs.put(block).await
                    }
                })
            })
            .collect::<Vec<_>>();
        let mut writes = 0usize;
        let mut existing = 0usize;
        for jh in join_handles {
            let res = jh.await;
            match res {
                Ok(Ok((_, BlockPut::NewBlock))) => writes += 1,
                Ok(Ok((_, BlockPut::Existed))) => existing += 1,
                Ok(Err(e)) => tracing::error!("joinhandle err: {e}"),
                _ => unreachable!("join error"),
            }
        }
        (writes, existing)
    }
    #[tokio::test]
    async fn remove() {
        let mut tmp = temp_dir();
        tmp.push("remove");
        std::fs::remove_dir_all(&tmp).ok();
        let single = FsBlockStore::new(tmp.clone());
        single.init().await.unwrap();
        let cid = Cid::try_from("QmRgutAxd8t7oGkSm4wmeuByG6M51wcTso6cubDdQtuEfL").unwrap();
        let data = hex!("0a0d08021207666f6f6261720a1807");
        let block = Block::new(cid, data.into()).unwrap();
        assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);
        single.put(block).await.unwrap();
        assert_eq!(
            single.list().await.collect::<Vec<_>>().await[0].hash(),
            cid.hash()
        );
        single.remove(&cid).await.unwrap();
        assert_eq!(single.list().await.collect::<Vec<_>>().await.len(), 0);
    }
}