Documentation
// Copyright (c) 2025, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::collections::HashMap;
use std::rc::Rc;

use bollard::container::{CreateContainerOptions, LogOutput, LogsOptions, NetworkingConfig};
use bollard::service::{EndpointSettings, HostConfig};
use bollard::Docker;
use futures::StreamExt;
use tokio::{runtime::Handle, task::JoinHandle};

use crate::handle;
use crate::host::Mode;
use crate::network::Network;
use crate::port::Port;
use crate::runner::Test;
use crate::TestError;

use crate::config::{Config, ContainerConfig};
use crate::constants::{DOCKER_PLATFORM, LABEL_KEY, LABEL_VALUE};

enum ContainerStatus {
    Created,
    Started,
    Stopped,
    Removed,
    Broken,
}

/// Struct to manage the container.
pub struct Container {
    pub(crate) docker: Docker,
    test: Rc<Test>,
    id: String,
    name: String,
    status: ContainerStatus,
    config: ContainerConfig,
    sockets: HashMap<Port, String>,
    logging_task: Option<JoinHandle<Result<(), TestError>>>,
    upload: bool,
}

impl Container {
    /// Get the id of the container.
    pub fn id(&self) -> &str {
        &self.id
    }

    /// Get the container name.
    pub fn name(&self) -> &str {
        &self.name
    }

    /// Get the container config.
    pub fn config(&self) -> &ContainerConfig {
        &self.config
    }

    /// Get the container socket.
    pub fn socket(&self, port: Port) -> Option<&str> {
        self.sockets.get(&port).map(|s| s.as_str())
    }

    /// Get the container sockets.
    pub fn sockets(&self) -> &HashMap<Port, String> {
        &self.sockets
    }

    fn test(&self) -> &Test {
        &self.test
    }

    /// Initialize the container.
    pub(super) async fn initialized(
        docker: Docker,
        test: Rc<Test>,
        mode: Mode,
        network: &Network,
        config: &dyn Config,
    ) -> Result<Self, TestError> {
        let config = config.to_container_config()?;
        config.image().pull(&docker).await?;

        let mut container = Container::create(docker, test, mode, network, config).await?;

        container.ensure_upload().await?;

        container.start().await?;

        // Initialize logs
        container.start_logging().await?;

        if let Some(wait) = container.config.readiness() {
            wait.probe(&container).await?;
        }

        Ok(container)
    }

    /// Create the container (without starting it).
    pub(super) async fn create(
        docker: Docker,
        test: Rc<Test>,
        mode: Mode,
        network: &Network,
        config: ContainerConfig,
    ) -> Result<Self, TestError> {
        log::info!("Creating Docker container for host {}.", config.hostname());

        let handle = handle::for_container(config.hostname());

        let exposed_ports: HashMap<_, _> = config
            .ports()
            .iter()
            .map(|p| (format!("{}/tcp", p.port()), HashMap::new()))
            .collect();

        let upload = test.force_upload() || mode == Mode::Containerized;

        let (port_bindings, sockets) = config.ports_and_sockets(mode)?;
        let mounts = if !upload {
            config.docker_mounts()
        } else {
            Default::default()
        };

        let platform = std::env::var(DOCKER_PLATFORM).ok();
        let env = config.formatted_env();
        let id = docker
            .create_container(
                Some(CreateContainerOptions {
                    name: handle.as_str(),
                    platform: platform.as_deref(),
                }),
                bollard::container::Config {
                    image: Some(config.image().locator()),
                    hostname: Some(config.hostname().to_string()),
                    env: (!env.is_empty()).then_some(env),
                    network_disabled: Some(false),
                    exposed_ports: (!exposed_ports.is_empty()).then_some(exposed_ports),
                    attach_stdin: Some(true),
                    attach_stdout: Some(true),
                    attach_stderr: Some(true),
                    open_stdin: Some(true),
                    host_config: Some(HostConfig {
                        port_bindings: (!port_bindings.is_empty()).then_some(port_bindings),
                        mounts: (!mounts.is_empty()).then_some(mounts),
                        ..Default::default()
                    }),
                    networking_config: Some(NetworkingConfig {
                        endpoints_config: HashMap::from([(
                            network.id().to_string(),
                            EndpointSettings {
                                aliases: Some(vec![config.hostname().to_string()]),
                                ..Default::default()
                            },
                        )]),
                    }),
                    cmd: Some(config.cmd().to_vec()),
                    labels: Some(HashMap::from([(
                        LABEL_KEY.to_string(),
                        LABEL_VALUE.to_string(),
                    )])),
                    ..Default::default()
                },
            )
            .await?
            .id;

        log::info!(
            "Created container with id {id} for host {}.",
            config.hostname()
        );

        Ok(Self {
            id,
            test,
            name: handle,
            docker,
            config,
            status: ContainerStatus::Created,
            sockets,
            logging_task: None,
            upload,
        })
    }

    /// Start the container.
    pub(super) async fn start(&mut self) -> Result<(), TestError> {
        self.docker
            .start_container::<String>(&self.id, None)
            .await?;
        self.status = ContainerStatus::Started;
        Ok(())
    }

    async fn start_logging(&mut self) -> Result<(), TestError> {
        let filename = self
            .test()
            .target_dir()
            .join(self.config.hostname())
            .with_extension("log");

        let mut file = tokio::fs::File::create(filename).await?;

        let mut logs = self.docker.logs(
            &self.id,
            Some(LogsOptions::<&str> {
                follow: true,
                stdout: true,
                stderr: true,
                timestamps: true,
                ..Default::default()
            }),
        );

        let dump_task = async move {
            while let Some(log) = logs.next().await {
                // catch all messages
                let message: Vec<u8> = match log? {
                    LogOutput::StdErr { message } => message,
                    LogOutput::StdOut { message } => message,
                    LogOutput::StdIn { message } => message,
                    LogOutput::Console { message } => message,
                }
                .into();

                tokio::io::copy(&mut message.as_ref(), &mut file).await?;
            }

            Ok::<_, TestError>(())
        };

        self.logging_task = Some(tokio::spawn(dump_task));

        Ok(())
    }

    async fn stop(&mut self) {
        match self.docker.stop_container(self.name(), None).await {
            Ok(()) => {
                self.status = ContainerStatus::Stopped;
            }
            Err(e) => {
                self.status = ContainerStatus::Broken;
                log::error!("Unable to stop container `{}`: {e}.", self.name());
            }
        }
    }

    async fn dump_logs(&mut self) {
        log::info!("Dumping logs for host {}.", self.config.hostname());

        if let Some(logging_task) = self.logging_task.take() {
            let name = self.name();
            let result = match logging_task.await {
                Ok(result) => result,
                Err(e) => {
                    log::error!("Unable to join logging task for container `{name}`: {e}");
                    return;
                }
            };
            match result {
                Ok(()) => {}
                Err(e) => log::error!("Failed log dumping for container `{name}`: {e}"),
            }
        }
    }

    async fn remove(&mut self) {
        log::info!(
            "Removing Docker container for host {}.",
            self.config.hostname()
        );

        match self.docker.remove_container(&self.id, None).await {
            Ok(()) => {
                self.status = ContainerStatus::Removed;
            }
            Err(e) => {
                self.status = ContainerStatus::Broken;
                log::error!("Unable to remove container {}: {e}.", self.name());
            }
        }

        self.dump_logs().await;
    }

    /// Ensure the container is uploaded.
    pub async fn ensure_upload(&self) -> Result<(), TestError> {
        use bollard::container::UploadToContainerOptions;
        use flate2::write::GzEncoder;
        use flate2::Compression;
        use std::io::Read as _;
        use tempfile::NamedTempFile;

        if !self.upload {
            return Ok(());
        }

        log::info!("Uploading files to host {}.", self.config().hostname());

        for (src, base, dst) in self.config.mounts() {
            let write = NamedTempFile::new()?;
            let mut read = write.reopen()?;

            let enc = GzEncoder::new(write, Compression::default());

            let mut tar = tar::Builder::new(enc);

            tar.append_dir_all(dst, src)?;

            tar.into_inner()?.finish()?;

            let mut contents = Vec::new();

            read.read_to_end(&mut contents)?;

            self.docker
                .upload_to_container(
                    &self.id,
                    Some(UploadToContainerOptions {
                        path: base.clone(),
                        ..Default::default()
                    }),
                    contents.into(),
                )
                .await
                .map_err(|e| {
                    log::error!("Failed to load files for host {}.", self.config.hostname());

                    TestError::Startup(format!(
                        "Unable to upload file '{src}' -> '{base}/{dst}': {e:?}"
                    ))
                })?;
        }

        Ok(())
    }

    /// Dispose the container.
    pub(super) async fn dispose(&mut self) {
        match self.status {
            ContainerStatus::Created => {
                self.remove().await;
            }
            ContainerStatus::Started => {
                self.stop().await;
                self.remove().await;
            }
            ContainerStatus::Stopped => {
                self.remove().await;
            }
            ContainerStatus::Removed => {}
            ContainerStatus::Broken => {}
        }
    }
}

impl Drop for Container {
    fn drop(&mut self) {
        tokio::task::block_in_place(|| Handle::current().block_on(self.dispose()));
    }
}