use std::{
    fmt::{Display, Formatter},
    future,
    path::PathBuf,
    sync::{
        atomic::{AtomicUsize, Ordering},
        Arc,
    },
};
use async_trait::async_trait;
use bytes::Bytes;
use chrono::{NaiveDateTime, TimeZone, Utc};
use futures::stream::{BoxStream, StreamExt};
use hdfs_native::{client::FileStatus, file::FileWriter, Client, HdfsError, WriteOptions};
use object_store::{
    multipart::{PartId, PutPart, WriteMultiPart},
    path::Path,
    GetOptions, GetResult, GetResultPayload, ListResult, MultipartId, ObjectMeta, ObjectStore,
    Result,
};
use tokio::io::AsyncWrite;
#[derive(Debug)]
pub struct HdfsObjectStore {
    client: Arc<Client>,
}
impl HdfsObjectStore {
    pub fn new(client: Client) -> Self {
        Self {
            client: Arc::new(client),
        }
    }
    async fn internal_copy(&self, from: &Path, to: &Path, overwrite: bool) -> Result<()> {
        let overwrite = match self.client.get_file_info(&make_absolute_file(to)).await {
            Ok(_) if overwrite => true,
            Ok(_) => Err(HdfsError::AlreadyExists(make_absolute_file(to))).to_object_store_err()?,
            Err(HdfsError::FileNotFound(_)) => false,
            Err(e) => Err(e).to_object_store_err()?,
        };
        let write_options = WriteOptions {
            overwrite,
            ..Default::default()
        };
        let file = self
            .client
            .read(&make_absolute_file(from))
            .await
            .to_object_store_err()?;
        let mut stream = file.read_range_stream(0, file.file_length()).boxed();
        let mut new_file = self
            .client
            .create(&make_absolute_file(to), write_options)
            .await
            .to_object_store_err()?;
        while let Some(bytes) = stream.next().await.transpose().to_object_store_err()? {
            new_file.write(bytes).await.to_object_store_err()?;
        }
        new_file.close().await.to_object_store_err()?;
        Ok(())
    }
}
impl Display for HdfsObjectStore {
    fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result {
        write!(f, "HdfsObjectStore")
    }
}
impl From<Client> for HdfsObjectStore {
    fn from(value: Client) -> Self {
        Self::new(value)
    }
}
#[async_trait]
impl ObjectStore for HdfsObjectStore {
    async fn put(&self, location: &Path, bytes: Bytes) -> Result<()> {
        let final_file_path = make_absolute_file(location);
        let path_buf = PathBuf::from(&final_file_path);
        let file_name = path_buf
            .file_name()
            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
            .to_object_store_err()?
            .to_str()
            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
            .to_object_store_err()?
            .to_string();
        let tmp_filename = path_buf
            .with_file_name(format!(".{}.tmp", file_name))
            .to_str()
            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
            .to_object_store_err()?
            .to_string();
        let overwrite = match self.client.get_file_info(&tmp_filename).await {
            Ok(_) => true,
            Err(HdfsError::FileNotFound(_)) => false,
            Err(e) => Err(e).to_object_store_err()?,
        };
        let write_options = WriteOptions {
            overwrite,
            ..Default::default()
        };
        let mut writer = self
            .client
            .create(&tmp_filename, write_options)
            .await
            .to_object_store_err()?;
        writer.write(bytes).await.to_object_store_err()?;
        writer.close().await.to_object_store_err()?;
        self.client
            .rename(&tmp_filename, &final_file_path, true)
            .await
            .to_object_store_err()?;
        Ok(())
    }
    async fn put_multipart(
        &self,
        location: &Path,
    ) -> Result<(MultipartId, Box<dyn AsyncWrite + Unpin + Send>)> {
        let final_file_path = make_absolute_file(location);
        let path_buf = PathBuf::from(&final_file_path);
        let file_name = path_buf
            .file_name()
            .ok_or(HdfsError::InvalidPath("path missing filename".to_string()))
            .to_object_store_err()?
            .to_str()
            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
            .to_object_store_err()?
            .to_string();
        let tmp_filename = path_buf
            .with_file_name(format!(".{}.tmp", file_name))
            .to_str()
            .ok_or(HdfsError::InvalidPath("path not valid unicode".to_string()))
            .to_object_store_err()?
            .to_string();
        let overwrite = match self.client.get_file_info(&tmp_filename).await {
            Ok(_) => true,
            Err(HdfsError::FileNotFound(_)) => false,
            Err(e) => Err(e).to_object_store_err()?,
        };
        let write_options = WriteOptions {
            overwrite,
            ..Default::default()
        };
        let writer = self
            .client
            .create(&tmp_filename, write_options)
            .await
            .to_object_store_err()?;
        Ok((
            tmp_filename.clone(),
            Box::new(WriteMultiPart::new(
                HdfsMultipartWriter::new(
                    Arc::clone(&self.client),
                    writer,
                    &tmp_filename,
                    &final_file_path,
                ),
                1,
            )),
        ))
    }
    async fn abort_multipart(&self, _location: &Path, multipart_id: &MultipartId) -> Result<()> {
        self.client
            .delete(multipart_id, false)
            .await
            .to_object_store_err()?;
        Ok(())
    }
    async fn get_opts(&self, location: &Path, options: GetOptions) -> Result<GetResult> {
        if options.if_match.is_some()
            || options.if_none_match.is_some()
            || options.if_modified_since.is_some()
            || options.if_unmodified_since.is_some()
        {
            return Err(object_store::Error::NotImplemented);
        }
        let meta = self.head(location).await?;
        let range = options.range.unwrap_or(0..meta.size);
        let reader = self
            .client
            .read(&make_absolute_file(location))
            .await
            .to_object_store_err()?;
        let stream = reader
            .read_range_stream(range.start, range.end - range.start)
            .map(|b| b.to_object_store_err())
            .boxed();
        let payload = GetResultPayload::Stream(stream);
        Ok(GetResult {
            payload,
            meta,
            range,
        })
    }
    async fn head(&self, location: &Path) -> Result<ObjectMeta> {
        let status = self
            .client
            .get_file_info(&make_absolute_file(location))
            .await
            .to_object_store_err()?;
        Ok(ObjectMeta {
            location: location.clone(),
            last_modified: Utc.from_utc_datetime(
                &NaiveDateTime::from_timestamp_opt(status.modification_time as i64, 0).unwrap(),
            ),
            size: status.length,
            e_tag: None,
        })
    }
    async fn delete(&self, location: &Path) -> Result<()> {
        let result = self
            .client
            .delete(&make_absolute_file(location), false)
            .await
            .to_object_store_err()?;
        if !result {
            Err(HdfsError::OperationFailed(
                "failed to delete object".to_string(),
            ))
            .to_object_store_err()?
        }
        Ok(())
    }
    async fn list(&self, prefix: Option<&Path>) -> Result<BoxStream<'_, Result<ObjectMeta>>> {
        let status_stream = self
            .client
            .list_status_iter(
                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
                true,
            )
            .into_stream()
            .filter(|res| {
                let result = match res {
                    Ok(status) => !status.isdir,
                    Err(HdfsError::FileNotFound(_)) => false,
                    _ => true,
                };
                future::ready(result)
            })
            .map(|res| res.map_or_else(|e| Err(e).to_object_store_err(), |s| get_object_meta(&s)));
        Ok(Box::pin(status_stream))
    }
    async fn list_with_delimiter(&self, prefix: Option<&Path>) -> Result<ListResult> {
        let mut status_stream = self
            .client
            .list_status_iter(
                &prefix.map(make_absolute_dir).unwrap_or("".to_string()),
                false,
            )
            .into_stream()
            .filter(|res| {
                let result = match res {
                    Err(HdfsError::FileNotFound(_)) => false,
                    _ => true,
                };
                future::ready(result)
            });
        let mut statuses = Vec::<FileStatus>::new();
        while let Some(status) = status_stream.next().await {
            statuses.push(status.to_object_store_err()?);
        }
        let mut dirs: Vec<Path> = Vec::new();
        for status in statuses.iter().filter(|s| s.isdir) {
            dirs.push(Path::parse(&status.path)?)
        }
        let mut files: Vec<ObjectMeta> = Vec::new();
        for status in statuses.iter().filter(|s| !s.isdir) {
            files.push(get_object_meta(status)?)
        }
        Ok(ListResult {
            common_prefixes: dirs,
            objects: files,
        })
    }
    async fn rename(&self, from: &Path, to: &Path) -> Result<()> {
        Ok(self
            .client
            .rename(&make_absolute_file(from), &make_absolute_file(to), true)
            .await
            .to_object_store_err()?)
    }
    async fn rename_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
        Ok(self
            .client
            .rename(&make_absolute_file(from), &make_absolute_file(to), false)
            .await
            .to_object_store_err()?)
    }
    async fn copy(&self, from: &Path, to: &Path) -> Result<()> {
        self.internal_copy(from, to, true).await
    }
    async fn copy_if_not_exists(&self, from: &Path, to: &Path) -> Result<()> {
        self.internal_copy(from, to, false).await
    }
}
#[cfg(feature = "integration-test")]
pub trait HdfsErrorConvert<T> {
    fn to_object_store_err(self) -> Result<T>;
}
#[cfg(not(feature = "integration-test"))]
trait HdfsErrorConvert<T> {
    fn to_object_store_err(self) -> Result<T>;
}
impl<T> HdfsErrorConvert<T> for hdfs_native::Result<T> {
    fn to_object_store_err(self) -> Result<T> {
        self.map_err(|err| match err {
            HdfsError::FileNotFound(path) => object_store::Error::NotFound {
                path: path.clone(),
                source: Box::new(HdfsError::FileNotFound(path)),
            },
            HdfsError::AlreadyExists(path) => object_store::Error::AlreadyExists {
                path: path.clone(),
                source: Box::new(HdfsError::AlreadyExists(path)),
            },
            _ => object_store::Error::Generic {
                store: "HdfsObjectStore",
                source: Box::new(err),
            },
        })
    }
}
struct HdfsMultipartWriter {
    client: Arc<Client>,
    inner: Arc<tokio::sync::Mutex<FileWriter>>,
    tmp_filename: String,
    final_filename: String,
    next_part: AtomicUsize,
}
impl HdfsMultipartWriter {
    fn new(
        client: Arc<Client>,
        inner: FileWriter,
        tmp_filename: &str,
        final_filename: &str,
    ) -> Self {
        Self {
            client,
            inner: Arc::new(tokio::sync::Mutex::new(inner)),
            tmp_filename: tmp_filename.to_string(),
            final_filename: final_filename.to_string(),
            next_part: AtomicUsize::new(0),
        }
    }
}
#[async_trait]
impl PutPart for HdfsMultipartWriter {
    async fn put_part(&self, buf: Vec<u8>, part_idx: usize) -> Result<PartId> {
        if part_idx != self.next_part.load(Ordering::SeqCst) {
            return Err(object_store::Error::NotSupported {
                source: "Part received out of order".to_string().into(),
            });
        }
        self.inner
            .lock()
            .await
            .write(buf.into())
            .await
            .to_object_store_err()?;
        self.next_part.fetch_add(1, Ordering::SeqCst);
        Ok(PartId {
            content_id: part_idx.to_string(),
        })
    }
    async fn complete(&self, _completed_parts: Vec<PartId>) -> Result<()> {
        self.inner
            .lock()
            .await
            .close()
            .await
            .to_object_store_err()?;
        self.client
            .rename(&self.tmp_filename, &self.final_filename, true)
            .await
            .to_object_store_err()?;
        Ok(())
    }
}
fn make_absolute_file(path: &Path) -> String {
    format!("/{}", path.as_ref())
}
fn make_absolute_dir(path: &Path) -> String {
    if path.parts().count() > 0 {
        format!("/{}/", path.as_ref())
    } else {
        "/".to_string()
    }
}
fn get_object_meta(status: &FileStatus) -> Result<ObjectMeta> {
    Ok(ObjectMeta {
        location: Path::parse(&status.path)?,
        last_modified: Utc.from_utc_datetime(
            &NaiveDateTime::from_timestamp_opt(status.modification_time as i64, 0).unwrap(),
        ),
        size: status.length,
        e_tag: None,
    })
}