use crate::error::Error;
use crate::repo::paths::{filestem_to_pin_cid, pin_path};
use crate::repo::{DataStore, PinKind, PinMode, PinModeRequirement, PinStore, References};
use async_trait::async_trait;
use core::convert::TryFrom;
use futures::stream::{BoxStream, TryStreamExt};
use futures::StreamExt;
use libipld::Cid;
use std::collections::{HashMap, HashSet};
use std::path::{Path, PathBuf};
use std::sync::Arc;
use tokio::fs;
use tokio::sync::{RwLock, Semaphore};
use tokio_stream::{empty, wrappers::ReadDirStream};
use tokio_util::either::Either;
#[derive(Debug)]
pub struct FsDataStore {
    path: PathBuf,
    lock: Arc<Semaphore>,
    ds_guard: Arc<RwLock<()>>,
}
impl FsDataStore {
    pub fn new(root: PathBuf) -> Self {
        FsDataStore {
            path: root,
            ds_guard: Arc::default(),
            lock: Arc::new(Semaphore::new(1)),
        }
    }
    fn key(&self, key: &[u8]) -> Option<(String, String)> {
        let key = String::from_utf8_lossy(key);
        let mut key_segments = key.split('/').collect::<Vec<_>>();
        let key_val = key_segments
            .pop()
            .map(PathBuf::from)
            .map(|path| path.with_extension("data"))
            .map(|path| path.to_string_lossy().to_string())?;
        let key_path_raw = key_segments.join("/");
        let key_path = match key_path_raw.starts_with('/') {
            true => key_path_raw[1..].to_string(),
            false => key_path_raw,
        };
        Some((key_path, key_val))
    }
    async fn write(&self, key: &[u8], val: &[u8]) -> std::io::Result<()> {
        let data_path = self.path.join("data");
        if !data_path.is_dir() {
            tokio::fs::create_dir_all(&data_path).await?;
        }
        let (path, key) = self
            .key(key)
            .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
        let path = data_path.join(path);
        if !path.is_dir() {
            tokio::fs::create_dir_all(&path).await?;
        }
        let path = path.join(key);
        if path.is_dir() {
            return Err(std::io::ErrorKind::Other.into());
        }
        tokio::fs::write(path, val).await
    }
    fn _contains(&self, key: &[u8]) -> bool {
        let data_path = self.path.join("data");
        let Some((path, key)) = self.key(key) else {
            return false;
        };
        let path = data_path.join(path);
        let path = path.join(key);
        path.is_file()
    }
    async fn delete(&self, key: &[u8]) -> std::io::Result<()> {
        let data_path = self.path.join("data");
        let (path, key) = self
            .key(key)
            .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
        let path = data_path.join(path);
        let path = path.join(key);
        tokio::fs::remove_file(path).await
    }
    async fn read(&self, key: &[u8]) -> std::io::Result<Option<Vec<u8>>> {
        let data_path = self.path.join("data");
        let (path, key) = self
            .key(key)
            .ok_or::<std::io::Error>(std::io::ErrorKind::NotFound.into())?;
        let path = data_path.join(path);
        let path = path.join(key);
        if path.is_dir() {
            return Ok(None);
        }
        tokio::fs::read(path).await.map(Some)
    }
}
fn build_kv<R: AsRef<Path>, P: AsRef<Path>>(
    data_path: R,
    path: P,
) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
    let data_path = data_path.as_ref().to_path_buf();
    let path = path.as_ref().to_path_buf();
    async_stream::stream! {
        if path.is_file() {
            return;
        }
        let Ok(dir) = tokio::fs::read_dir(path).await else {
            return;
        };
        let st =
            ReadDirStream::new(dir).filter_map(|result| futures::future::ready(result.ok()));
        for await entry in st {
            let path = entry.path();
            if path.is_dir() {
                for await item in build_kv(&data_path, &path) {
                    yield item;
                }
            } else {
                let root_str = data_path.to_string_lossy().to_string();
                let path_str = path.to_string_lossy().to_string();
                let raw_key = &path_str[root_str.len()..];
                if raw_key.is_empty() {
                    continue;
                }
                let Some(key) = raw_key.get(0..raw_key.len() - 5) else {
                    continue;
                };
                if let Ok(bytes) = tokio::fs::read(path).await {
                    let key = key.as_bytes().to_vec();
                    yield (key, bytes)
                }
            }
        }
    }
    .boxed()
}
#[async_trait]
impl DataStore for FsDataStore {
    async fn init(&self) -> Result<(), Error> {
        tokio::fs::create_dir_all(&self.path.join("pins")).await?;
        tokio::fs::create_dir_all(&self.path.join("data")).await?;
        Ok(())
    }
    async fn open(&self) -> Result<(), Error> {
        Ok(())
    }
    async fn contains(&self, key: &[u8]) -> Result<bool, Error> {
        let _g = self.ds_guard.read().await;
        Ok(self._contains(key))
    }
    async fn get(&self, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
        let _g = self.ds_guard.read().await;
        self.read(key).await.map_err(Error::from)
    }
    async fn put(&self, key: &[u8], value: &[u8]) -> Result<(), Error> {
        let _g = self.ds_guard.write().await;
        self.write(key, value).await.map_err(Error::from)
    }
    async fn remove(&self, key: &[u8]) -> Result<(), Error> {
        let _g = self.ds_guard.write().await;
        self.delete(key).await.map_err(Error::from)
    }
    async fn iter(&self) -> BoxStream<'static, (Vec<u8>, Vec<u8>)> {
        let data_path = self.path.join("data");
        build_kv(&data_path, &data_path)
    }
}
#[async_trait]
impl PinStore for FsDataStore {
    async fn is_pinned(&self, cid: &Cid) -> Result<bool, Error> {
        let path = pin_path(self.path.join("pins"), cid);
        if read_direct_or_recursive(path).await?.is_some() {
            return Ok(true);
        }
        let st = self.list_pinfiles().await.try_filter_map(|(cid, mode)| {
            futures::future::ready(if mode == PinMode::Recursive {
                Ok(Some(cid))
            } else {
                Ok(None)
            })
        });
        futures::pin_mut!(st);
        while let Some(recursive) = TryStreamExt::try_next(&mut st).await? {
            let (_, references) =
                read_recursively_pinned(self.path.join("pins"), recursive).await?;
            if references.into_iter().any(move |x| x == *cid) {
                return Ok(true);
            }
        }
        Ok(false)
    }
    async fn insert_direct_pin(&self, target: &Cid) -> Result<(), Error> {
        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
        let mut path = pin_path(self.path.join("pins"), target);
        let span = tracing::Span::current();
        tokio::task::spawn_blocking(move || {
            let _permit = permit;
            let _entered = span.enter();
            std::fs::create_dir_all(path.parent().expect("shard parent has to exist"))?;
            path.set_extension("recursive");
            if path.is_file() {
                return Err(anyhow::anyhow!("already pinned recursively"));
            }
            path.set_extension("direct");
            let f = std::fs::File::create(path)?;
            f.sync_all()?;
            Ok(())
        })
        .await??;
        Ok(())
    }
    async fn insert_recursive_pin(
        &self,
        target: &Cid,
        referenced: References<'_>,
    ) -> Result<(), Error> {
        let set = referenced
            .try_collect::<std::collections::BTreeSet<_>>()
            .await?;
        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
        let mut path = pin_path(self.path.join("pins"), target);
        let span = tracing::Span::current();
        tokio::task::spawn_blocking(move || {
            let _permit = permit; let _entered = span.enter();
            std::fs::create_dir_all(path.parent().expect("shard parent has to exist"))?;
            let count = set.len();
            let cids = set.into_iter().map(|cid| cid.to_string());
            path.set_extension("recursive_temp");
            let file = std::fs::File::create(&path)?;
            match sync_write_recursive_pin(file, count, cids) {
                Ok(_) => {
                    let final_path = path.with_extension("recursive");
                    std::fs::rename(&path, final_path)?
                }
                Err(e) => {
                    let removed = std::fs::remove_file(&path);
                    match removed {
                        Ok(_) => debug!("cleaned up ok after botched recursive pin write"),
                        Err(e) => warn!("failed to cleanup temporary file: {}", e),
                    }
                    return Err(e);
                }
            }
            path.set_extension("direct");
            match std::fs::remove_file(&path) {
                Ok(_) => { }
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => { }
                Err(e) => {
                    warn!(
                        "failed to remove direct pin when adding recursive {:?}: {}",
                        path, e
                    );
                }
            }
            Ok::<_, Error>(())
        })
        .await??;
        Ok(())
    }
    async fn remove_direct_pin(&self, target: &Cid) -> Result<(), Error> {
        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
        let mut path = pin_path(self.path.join("pins"), target);
        let span = tracing::Span::current();
        tokio::task::spawn_blocking(move || {
            let _permit = permit; let _entered = span.enter();
            path.set_extension("recursive");
            if path.is_file() {
                return Err(anyhow::anyhow!("is pinned recursively"));
            }
            path.set_extension("direct");
            match std::fs::remove_file(&path) {
                Ok(_) => {
                    trace!("direct pin removed");
                    Ok(())
                }
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
                    Err(anyhow::anyhow!("not pinned or pinned indirectly"))
                }
                Err(e) => Err(e.into()),
            }
        })
        .await??;
        Ok(())
    }
    async fn remove_recursive_pin(&self, target: &Cid, _: References<'_>) -> Result<(), Error> {
        let permit = Semaphore::acquire_owned(Arc::clone(&self.lock)).await?;
        let mut path = pin_path(self.path.join("pins"), target);
        let span = tracing::Span::current();
        tokio::task::spawn_blocking(move || {
            let _permit = permit; let _entered = span.enter();
            path.set_extension("direct");
            let mut any = false;
            match std::fs::remove_file(&path) {
                Ok(_) => {
                    trace!("direct pin removed");
                    any |= true;
                }
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
                    }
                Err(e) => return Err(Error::new(e)),
            }
            path.set_extension("recursive");
            match std::fs::remove_file(&path) {
                Ok(_) => {
                    trace!("recursive pin removed");
                    any |= true;
                }
                Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
                    }
                Err(e) => return Err(e.into()),
            }
            if !any {
                Err(anyhow::anyhow!("not pinned or pinned indirectly"))
            } else {
                Ok(())
            }
        })
        .await??;
        Ok(())
    }
    async fn list(
        &self,
        requirement: Option<PinMode>,
    ) -> futures::stream::BoxStream<'static, Result<(Cid, PinMode), Error>> {
        let cids = self.list_pinfiles().await;
        let path = self.path.join("pins");
        let requirement = PinModeRequirement::from(requirement);
        let st = async_stream::try_stream! {
            let mut returned: HashSet<Cid> = HashSet::default();
            let mut recursive: HashSet<Cid> = HashSet::default();
            let mut direct: HashSet<Cid> = HashSet::default();
            let collect_recursive_for_indirect = requirement.is_indirect_or_any();
            futures::pin_mut!(cids);
            while let Some((cid, mode)) = TryStreamExt::try_next(&mut cids).await? {
                let matches = requirement.matches(&mode);
                if mode == PinMode::Recursive {
                    if collect_recursive_for_indirect {
                        recursive.insert(cid);
                    }
                    if matches && returned.insert(cid) {
                        yield (cid, mode);
                    }
                } else if mode == PinMode::Direct && matches {
                    direct.insert(cid);
                }
            }
            trace!(unique = returned.len(), "completed listing recursive");
            for cid in direct {
                if returned.insert(cid) {
                    yield (cid, PinMode::Direct)
                }
            }
            trace!(unique = returned.len(), "completed listing direct");
            if !collect_recursive_for_indirect {
                return;
            }
            let mut recursive = futures::stream::iter(recursive.into_iter().map(Ok))
                .map_ok(move |cid| read_recursively_pinned(path.clone(), cid))
                .try_buffer_unordered(4);
            while let Some((_, next_batch)) = TryStreamExt::try_next(&mut recursive).await? {
                for indirect in next_batch {
                    if returned.insert(indirect) {
                        yield (indirect, PinMode::Indirect);
                    }
                }
                trace!(unique = returned.len(), "completed batch of indirect");
            }
        };
        Box::pin(st)
    }
    async fn query(
        &self,
        ids: Vec<Cid>,
        requirement: Option<PinMode>,
    ) -> Result<Vec<(Cid, PinKind<Cid>)>, Error> {
        let mut response = Vec::with_capacity(ids.len());
        for _ in 0..ids.len() {
            response.push(None);
        }
        let mut remaining = HashMap::new();
        let (check_direct, searched_suffix, gather_indirect) = match requirement {
            Some(PinMode::Direct) => (true, Some(PinMode::Direct), false),
            Some(PinMode::Recursive) => (true, Some(PinMode::Recursive), false),
            Some(PinMode::Indirect) => (false, None, true),
            None => (true, None, true),
        };
        let searched_suffix = PinModeRequirement::from(searched_suffix);
        let (mut response, mut remaining) = if check_direct {
            let base = self.path.join("pins");
            tokio::task::spawn_blocking(move || {
                for (i, cid) in ids.into_iter().enumerate() {
                    let mut path = pin_path(base.clone(), &cid);
                    if let Some(mode) = sync_read_direct_or_recursive(&mut path) {
                        if searched_suffix.matches(&mode) {
                            response[i] = Some((
                                cid,
                                match mode {
                                    PinMode::Direct => PinKind::Direct,
                                    PinMode::Recursive => PinKind::Recursive(0),
                                    _ => unreachable!(),
                                },
                            ));
                            continue;
                        }
                    }
                    if !gather_indirect {
                        return Err(anyhow::anyhow!("{} is not pinned", cid));
                    }
                    remaining.entry(cid).or_insert(i);
                }
                Ok((response, remaining))
            })
            .await??
        } else {
            for (i, cid) in ids.into_iter().enumerate() {
                remaining.entry(cid).or_insert(i);
            }
            (response, remaining)
        };
        if !remaining.is_empty() {
            assert!(gather_indirect);
            trace!(
                remaining = remaining.len(),
                "query trying to find remaining indirect pins"
            );
            let recursives = self
                .list_pinfiles()
                .await
                .try_filter_map(|(cid, mode)| {
                    futures::future::ready(if mode == PinMode::Recursive {
                        Ok(Some(cid))
                    } else {
                        Ok(None)
                    })
                })
                .map_ok(|cid| read_recursively_pinned(self.path.join("pins"), cid))
                .try_buffer_unordered(4);
            futures::pin_mut!(recursives);
            'out: while let Some((referring, references)) =
                TryStreamExt::try_next(&mut recursives).await?
            {
                for cid in references {
                    if let Some(index) = remaining.remove(&cid) {
                        response[index] = Some((cid, PinKind::IndirectFrom(referring)));
                        if remaining.is_empty() {
                            break 'out;
                        }
                    }
                }
            }
        }
        if let Some((cid, _)) = remaining.into_iter().next() {
            return Err(anyhow::anyhow!("{} is not pinned", cid));
        }
        Ok(response.into_iter().flatten().collect())
    }
}
impl FsDataStore {
    async fn list_pinfiles(
        &self,
    ) -> impl futures::stream::Stream<Item = Result<(Cid, PinMode), Error>> + 'static {
        let stream = match tokio::fs::read_dir(self.path.join("pins")).await {
            Ok(st) => Either::Left(ReadDirStream::new(st)),
            Err(e) => Either::Right(futures::stream::once(futures::future::ready(Err(e)))),
        };
        stream
            .and_then(|d| async move {
                Ok(if d.file_type().await?.is_dir() {
                    Either::Left(ReadDirStream::new(fs::read_dir(d.path()).await?))
                } else {
                    Either::Right(empty())
                })
            })
            .try_flatten()
            .map_err(Error::new)
            .try_filter_map(|d| {
                let name = d.file_name();
                let path: &std::path::Path = name.as_ref();
                let mode = if path.extension() == Some("recursive".as_ref()) {
                    Some(PinMode::Recursive)
                } else if path.extension() == Some("direct".as_ref()) {
                    Some(PinMode::Direct)
                } else {
                    None
                };
                let maybe_tuple = mode.and_then(move |mode| {
                    filestem_to_pin_cid(path.file_stem()).map(move |cid| (cid, mode))
                });
                futures::future::ready(Ok(maybe_tuple))
            })
    }
}
async fn read_recursively_pinned(path: PathBuf, cid: Cid) -> Result<(Cid, Vec<Cid>), Error> {
    let mut path = pin_path(path, &cid);
    path.set_extension("recursive");
    let contents = match tokio::fs::read(path).await {
        Ok(vec) => vec,
        Err(e) if e.kind() == std::io::ErrorKind::NotFound => {
            return Ok((cid, Vec::new()));
        }
        Err(e) => return Err(e.into()),
    };
    let cids: Vec<&str> = serde_json::from_slice(&contents)?;
    let found = cids
        .into_iter()
        .map(Cid::try_from)
        .collect::<Result<Vec<Cid>, _>>()?;
    trace!(cid = %cid, count = found.len(), "read indirect pins");
    Ok((cid, found))
}
async fn read_direct_or_recursive(mut block_path: PathBuf) -> Result<Option<PinMode>, Error> {
    tokio::task::spawn_blocking(move || Ok(sync_read_direct_or_recursive(&mut block_path))).await?
}
fn sync_read_direct_or_recursive(block_path: &mut PathBuf) -> Option<PinMode> {
    for (ext, mode) in &[
        ("recursive", PinMode::Recursive),
        ("direct", PinMode::Direct),
    ] {
        block_path.set_extension(ext);
        if block_path.is_file() {
            return Some(*mode);
        }
    }
    None
}
fn sync_write_recursive_pin(
    file: std::fs::File,
    count: usize,
    cids: impl Iterator<Item = String>,
) -> Result<(), Error> {
    use serde::{ser::SerializeSeq, Serializer};
    use std::io::{BufWriter, Write};
    let writer = BufWriter::new(file);
    let mut serializer = serde_json::ser::Serializer::new(writer);
    let mut seq = serializer.serialize_seq(Some(count))?;
    for cid in cids {
        seq.serialize_element(&cid)?;
    }
    seq.end()?;
    let mut writer = serializer.into_inner();
    writer.flush()?;
    let file = writer.into_inner()?;
    file.sync_all()?;
    Ok(())
}
#[cfg(test)]
crate::pinstore_interface_tests!(
    common_tests,
    crate::repo::datastore::flatfs::FsDataStore::new
);
#[cfg(test)]
mod test {
    use crate::repo::{datastore::flatfs::FsDataStore, DataStore};
    #[tokio::test]
    async fn test_kv_datastore() -> anyhow::Result<()> {
        let tmp = std::env::temp_dir();
        let store = FsDataStore::new(tmp.clone());
        let key = [1, 2, 3, 4];
        let value = [5, 6, 7, 8];
        store.init().await?;
        store.open().await?;
        let contains = store.contains(&key).await.unwrap();
        assert!(!contains);
        let get = store.get(&key).await.unwrap_or_default();
        assert_eq!(get, None);
        assert!(store.remove(&key).await.is_err());
        store.put(&key, &value).await.unwrap();
        let contains = store.contains(&key).await.unwrap();
        assert!(contains);
        let get = store.get(&key).await.unwrap();
        assert_eq!(get, Some(value.to_vec()));
        store.remove(&key).await.unwrap();
        let contains = store.contains(&key).await.unwrap();
        assert!(!contains);
        let get = store.get(&key).await.unwrap_or_default();
        assert_eq!(get, None);
        drop(store);
        Ok(())
    }
}