Documentation
mod database;
mod utils;

use database::{KvCache, LruKvCache, SqliteKvCache};
use lazy_static::lazy_static;
use log::error;
use rayon::prelude::*;
use reqwest::{
    blocking::Client,
    header::{HeaderMap, HeaderValue, AUTHORIZATION},
    redirect::Policy,
    Identity,
};
use std::{
    convert::TryInto,
    iter::FromIterator,
    path::{Path, PathBuf},
    sync::{Arc, Mutex, RwLock},
};
use thiserror::Error;
use utils::get_hash;

const CHUNK_SIZE: usize = 1024 * 1024;

#[derive(Debug, Error)]
pub enum CfKvFsError {
    #[error("Data transfer error: {0}")]
    ReqwestError(#[from] reqwest::Error),
    #[error("Database error: {0}")]
    RusqliteError(#[from] rusqlite::Error),
    #[error("Database migration error: {0}")]
    RusqliteMigrationError(#[from] rusqlite_migration::Error),
    #[error("Data parse error: {0}")]
    IntParseConvertError(#[from] std::array::TryFromSliceError),
    #[error("Data invalid")]
    HashError,
}

type Reducer = Box<dyn Fn(Vec<u8>) -> Vec<u8> + Sync>;

pub struct CfKvFsBuilder {
    endpoint: String,
    prefix: String,
    header: Option<HeaderMap>,
    pem: Option<Vec<u8>>,
    reducer: Option<Reducer>,
    path: Option<PathBuf>,
    table: Option<String>,
}

impl CfKvFsBuilder {
    fn new<E, P>(endpoint: E, prefix: P) -> Self
    where
        E: Into<String>,
        P: Into<String>,
    {
        Self {
            endpoint: endpoint.into(),
            prefix: prefix.into(),
            header: None,
            pem: None,
            reducer: None,
            path: None,
            table: None,
        }
    }

    pub fn auth(self, auth: &str) -> Self {
        if let Ok(auth) = HeaderValue::from_str(auth) {
            let header = HeaderMap::from_iter([(AUTHORIZATION, auth)]);
            self.header(header)
        } else {
            self
        }
    }

    pub fn header(mut self, header: HeaderMap) -> Self {
        self.header = Some(header);
        self
    }

    pub fn pem(mut self, pem: Vec<u8>) -> Self {
        self.pem = Some(pem);
        self
    }

    pub fn reducer<R: 'static + Fn(Vec<u8>) -> Vec<u8> + Sync>(mut self, reducer: R) -> Self {
        self.reducer = Some(Box::new(reducer));
        self
    }

    pub fn path<P: AsRef<Path>>(mut self, path: P) -> Self {
        self.path = Some(path.as_ref().to_path_buf());
        self
    }

    pub fn table<S: Into<String>>(mut self, table: S) -> Self {
        self.table = Some(table.into());
        self
    }

    pub fn build(self) -> Option<CfKvFs> {
        CfKvFs::set_kv_cache(self.path, self.table);
        CfKvFs::inner_new(
            self.endpoint,
            self.prefix,
            self.header,
            self.pem,
            self.reducer,
        )
    }
}

pub struct CfKvFs {
    client: Client,
    endpoint: String,
    prefix: String,
    reducer: Option<Reducer>,
}

impl CfKvFs {
    pub fn builder<E, P>(endpoint: E, prefix: P) -> CfKvFsBuilder
    where
        E: Into<String>,
        P: Into<String>,
    {
        CfKvFsBuilder::new(endpoint, prefix)
    }

    pub fn new<E, P>(endpoint: E, prefix: P) -> Option<Self>
    where
        E: Into<String>,
        P: Into<String>,
    {
        Self::inner_new(endpoint, prefix, None, None, Some(Box::new(|data| data)))
    }

    fn inner_new<E, P>(
        endpoint: E,
        prefix: P,
        header: Option<HeaderMap>,
        pem: Option<Vec<u8>>,
        reducer: Option<Reducer>,
    ) -> Option<Self>
    where
        E: Into<String>,
        P: Into<String>,
    {
        let mut builder = Client::builder()
            .redirect(Policy::none())
            .no_proxy()
            .http2_prior_knowledge();
        if let Some(header) = header {
            builder = builder.default_headers(header);
        }
        if let Some(pem) = pem {
            if let Ok(identity) = Identity::from_pem(&pem) {
                builder = builder.identity(identity);
            }
        }
        if let Ok(client) = builder.build() {
            Some(Self {
                client,
                endpoint: endpoint.into(),
                prefix: prefix.into(),
                reducer,
            })
        } else {
            None
        }
    }

    fn set_kv_cache(
        path: Option<PathBuf>,
        name: Option<String>,
    ) -> Arc<Mutex<Box<dyn KvCache + Send + Sync>>> {
        lazy_static! {
            static ref KV_PATH: Arc<RwLock<PathBuf>> = Arc::new(RwLock::new("./cache.db".into()));
            static ref KV_TABLE: Arc<RwLock<String>> = Arc::new(RwLock::new("kv".into()));
            static ref KV_CACHE: Arc<Mutex<Box<dyn KvCache + Send + Sync>>> = Arc::new(Mutex::new(
                SqliteKvCache::new(&*KV_PATH.read().unwrap(), &*KV_TABLE.read().unwrap())
                    .unwrap_or_else(|_| LruKvCache::new())
            ));
        }
        if let Some(path) = path {
            *KV_PATH.write().unwrap() = path;
        }
        if let Some(name) = name {
            *KV_TABLE.write().unwrap() = name;
        }
        KV_CACHE.clone()
    }

    fn get_kv_cache() -> Arc<Mutex<Box<dyn KvCache + Send + Sync>>> {
        Self::set_kv_cache(None, None)
    }

    fn post_data(&self, name: &str, data: Vec<u8>, index: bool) -> i64 {
        let mut retry = 0;
        let data = if let (Some(reducer), false) = (&self.reducer, index) {
            reducer(data)
        } else {
            data
        };
        let hash = get_hash(&data);
        while let Err(err) = self
            .client
            .post(format!(
                "{}/{}/{}:{}",
                self.endpoint,
                self.prefix,
                name,
                if index {
                    "index".into()
                } else {
                    hash.to_string()
                }
            ))
            .body(data.clone())
            .send()
        {
            if retry > 3 {
                error!("Failed to save blob: {}", err);
                return 0;
            } else {
                retry += 1;
            }
        }
        hash
    }

    pub fn put_blob(&self, name: &str, data: Vec<u8>) {
        let chunked_vec = data.chunks(CHUNK_SIZE).collect::<Vec<_>>();
        let hash_list = chunked_vec
            .par_iter()
            .map(|chunk| self.post_data(name, chunk.to_vec(), false).to_le_bytes())
            .flatten()
            .collect::<Vec<_>>();
        self.post_data(name, hash_list, true);
    }

    fn get_data(&self, name: &str, hash: i64) -> Result<Vec<u8>, CfKvFsError> {
        let key = format!(
            "{}:{}",
            name,
            if hash == 0 {
                "index".into()
            } else {
                hash.to_string()
            }
        );
        if let Ok(Some(value)) = Self::get_kv_cache().lock().unwrap().get(key.clone()) {
            return if let (Some(reducer), false) = (&self.reducer, hash == 0) {
                Ok(reducer(value))
            } else {
                Ok(value)
            };
        }
        let mut retry = 0;
        let mut buf: Vec<u8> = vec![];
        while let Err(err) = self
            .client
            .get(format!("{}/{}/{}", self.endpoint, self.prefix, key))
            .send()
            .and_then(|mut resp| resp.copy_to(&mut buf))
            .map_err(CfKvFsError::ReqwestError)
            .and_then(|_| {
                if hash == 0 || get_hash(&buf) == hash {
                    Ok(())
                } else {
                    Err(CfKvFsError::HashError)
                }
            })
        {
            if retry > 3 {
                return Err(err);
            } else {
                retry += 1;
            }
        }
        let data = Self::get_kv_cache().lock().unwrap().put(key, buf)?;
        if let (Some(reducer), false) = (&self.reducer, hash == 0) {
            Ok(reducer(data))
        } else {
            Ok(data)
        }
    }

    pub fn get_blob(&self, name: &str) -> Result<Vec<u8>, CfKvFsError> {
        let data = self.get_data(name, 0)?;
        let hashes = data
            .chunks(8)
            .map(|hash| hash.try_into())
            .collect::<Result<Vec<[u8; 8]>, _>>()?;
        Ok(hashes
            .par_iter()
            .map(|hash| self.get_data(name, i64::from_le_bytes(*hash)))
            .collect::<Result<Vec<_>, CfKvFsError>>()?
            .iter()
            .flatten()
            .cloned()
            .collect())
    }
}

#[test]
fn test_upload() {
    let cache_handler = CfKvFs::builder("https://example.com", "path")
        .reducer(|a| a.iter().chain(std::iter::once(&0)).cloned().collect())
        .build()
        .unwrap();
    cache_handler.put_blob("test.bin", std::fs::read("test.bin").unwrap());
}

#[test]
fn test_download() {
    let cache_handler = CfKvFs::builder("https://example.com", "path")
        .auth("Bearer 12345")
        .table("test1")
        .build()
        .unwrap();
    let data = cache_handler.get_blob("test.bin").unwrap();
    std::fs::write("test1.bin", data).unwrap();
}