Documentation
// Copyright (c) 2025, Salesforce, Inc.,
// All rights reserved.
// For full license text, see the LICENSE.txt file

use std::{future, time::Duration};

use async_trait::async_trait;
use bollard::container::{LogOutput, LogsOptions};
use futures::StreamExt;

use crate::container::Container;
use crate::probe::Probe;
use crate::TestError;

/// Enum to manage the message source.
#[allow(dead_code)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum MessageSource {
    StdErr,
    StdOut,
    Any,
}

/// Struct to manage the message probe.
#[derive(Debug, Clone)]
pub struct MessageProbe {
    message: String,
    times: usize,
    timeout: Duration,
    source: MessageSource,
}

#[allow(dead_code)]
impl MessageProbe {
    /// Get the message.
    pub fn message(&self) -> &str {
        &self.message
    }

    /// Get the times.
    pub fn times(&self) -> usize {
        self.times
    }

    /// Get the timeout.
    pub fn timeout(&self) -> Duration {
        self.timeout
    }

    /// Get the source.
    pub fn source(&self) -> MessageSource {
        self.source
    }

    /// Create a new message probe.
    pub fn new(message: String) -> Self {
        Self {
            message,
            timeout: Duration::from_secs(60),
            times: 1,
            source: MessageSource::StdOut,
        }
    }

    /// Create a new message probe builder.
    pub fn builder<T: Into<String>>(message: T) -> MessageProbeBuilder {
        MessageProbeBuilder::new(message)
    }
}

/// Struct to manage the message probe builder.
#[derive(Debug, Clone)]
pub struct MessageProbeBuilder {
    probe: MessageProbe,
}

impl MessageProbeBuilder {
    fn new<T: Into<String>>(message: T) -> Self {
        Self {
            probe: MessageProbe::new(message.into()),
        }
    }

    /// Set the times.
    pub fn times(self, times: usize) -> Self {
        Self {
            probe: MessageProbe {
                times,
                ..self.probe
            },
        }
    }

    /// Set the source.
    pub fn source(self, source: MessageSource) -> Self {
        Self {
            probe: MessageProbe {
                source,
                ..self.probe
            },
        }
    }

    /// Set the timeout.
    pub fn timeout(self, timeout: Duration) -> Self {
        Self {
            probe: MessageProbe {
                timeout,
                ..self.probe
            },
        }
    }

    /// Build the message probe.
    pub fn build(self) -> MessageProbe {
        self.probe
    }
}

#[async_trait(?Send)]
impl Probe for MessageProbe {
    async fn probe(&self, container: &Container) -> Result<(), TestError> {
        MessageProbe::probe(self, container).await
    }
}

impl MessageProbe {
    async fn probe(&self, container: &Container) -> Result<(), TestError> {
        // Construct LogOptions
        let mut log_options = LogsOptions::<&str> {
            follow: true,
            ..Default::default()
        };
        match self.source {
            MessageSource::StdOut => log_options.stdout = true,
            MessageSource::StdErr => log_options.stderr = true,
            MessageSource::Any => {
                log_options.stdout = true;
                log_options.stderr = true;
            }
        };
        let log_options = Some(log_options);

        // Get the logs
        let logs = container.docker.logs(container.id(), log_options);

        // Work configuration
        let task = async {
            logs.take_while(|chunk| future::ready(!chunk.is_err()))
                .filter(|chunk| {
                    match chunk {
                        Ok(chunk) => {
                            // Extract the String from LogOutput variants
                            let content = match chunk {
                                LogOutput::StdErr { message } => Some(message),
                                LogOutput::StdOut { message } => Some(message),
                                LogOutput::StdIn { message: _ } => None,
                                LogOutput::Console { message: _ } => None,
                            };
                            match content {
                                Some(content)
                                    if String::from_utf8(content.to_vec())
                                        .unwrap()
                                        .contains(&self.message) =>
                                {
                                    futures::future::ready(true)
                                }
                                _ => futures::future::ready(false),
                            }
                        }
                        _ => futures::future::ready(false),
                    }
                })
                .take(self.times)
                .collect::<Vec<_>>()
                .await
        };

        match tokio::time::timeout(self.timeout, task).await {
            Ok(values) => {
                if values.len() == self.times {
                    Ok(())
                } else {
                    let name = container.name();
                    let err = format!("Container `{name}` ended log before message triggered");
                    Err(TestError::Startup(err))
                }
            }
            Err(_) => {
                log::warn!("Awaiting container message timed out");
                Err(TestError::Startup(
                    "Awaiting container message timed out".to_string(),
                ))
            }
        }
    }
}