mod certs;
pub mod api {
    include!("api/google.pubsub.v1.rs");
}
use anyhow::Result;
use api::{subscriber_client::SubscriberClient, AcknowledgeRequest, PullRequest, ReceivedMessage};
use google_auth::DefaultCredentials;
use log::debug;
use std::collections::HashMap;
use std::collections::VecDeque;
use tonic::{
    metadata::MetadataValue,
    transport::{Certificate, Channel, ClientTlsConfig},
    Request,
};
const ENDPOINT: &str = "https://pubsub.googleapis.com";

pub struct Pubsub {
    project_id: String,
}

async fn create_subscriber_client() -> Result<SubscriberClient<Channel>> {
    let pem = certs::get_pem();
    let tls_config = ClientTlsConfig::new()
        .ca_certificate(Certificate::from_pem(&pem))
        .domain_name("pubsub.googleapis.com");
    let channel = Channel::from_static(ENDPOINT)
        .tls_config(tls_config)?
        .connect()
        .await?;

    let mut cred = DefaultCredentials::new();
    let token = cred.token().await?;
    let bearer_token = format!("Bearer {}", token.access_token);
    let header_value = MetadataValue::from_str(&bearer_token)?;
    let subscriber_client =
        SubscriberClient::with_interceptor(channel, move |mut req: Request<()>| {
            req.metadata_mut()
                .insert("authorization", header_value.clone());
            Ok(req)
        });
    Ok(subscriber_client)
}

pub struct Subscription {
    project_id: String,
    subscription_name: String,
    client: SubscriberClient<Channel>,
    buffer: VecDeque<ReceivedMessage>,
}

#[derive(Clone)]
pub struct Message {
    pub(crate) client: SubscriberClient<Channel>,
    pub(crate) data: Vec<u8>,
    pub(crate) attributes: HashMap<String, String>,
    pub(crate) ack_id: String,
    pub(crate) message_id: String,
    pub(crate) publish_time: chrono::NaiveDateTime,
    pub(crate) subscription_name: String,
}

impl Message {
    pub async fn ack(&mut self) -> Result<()> {
        let req = AcknowledgeRequest {
            subscription: self.subscription_name.clone(),
            ack_ids: vec![self.ack_id.clone()],
        };
        self.client.acknowledge(req).await?;
        Ok(())
    }
    pub fn data(&self) -> &[u8] {
        self.data.as_slice()
    }
    pub fn id(&self) -> &str {
        self.message_id.as_str()
    }
    pub fn attributes(&self) -> &HashMap<String, String> {
        &self.attributes
    }
}

impl Subscription {
    pub async fn pull(&mut self, max_messages: i32) -> Result<Vec<ReceivedMessage>> {
        let mut retry_counter: i32 = 0;
        let max_retry = 10;
        let response = loop {
            if retry_counter >= max_retry {
                debug!("retry exceed max_retry: {}", retry_counter);
                return Err(anyhow::anyhow!("gg"));
            }
            let mut pr = PullRequest::default();
            pr.subscription = format!(
                "projects/{}/subscriptions/{}",
                &self.project_id, &self.subscription_name
            );
            pr.max_messages = max_messages;
            let result = self.client.pull(pr).await;
            let res = match result {
                Ok(res) => res.into_inner(),
                Err(err) if err.code() == tonic::Code::Unauthenticated => {
                    retry_counter += 1;
                    debug!("unauthenticated: {}, retry ... #{}", err, retry_counter);
                    self.client = create_subscriber_client().await?;
                    continue;
                }
                Err(err) => {
                    retry_counter += 1;
                    debug!("error: {}, retry ... #{}", err, retry_counter);
                    continue;
                }
            };
            break res;
        };
        Ok(response.received_messages)
    }
    pub async fn receive(&mut self) -> Option<Message> {
        loop {
            if let Some(handle) = self.buffer.pop_front() {
                let pubsub_message = handle.message.unwrap();
                let timestamp = pubsub_message.publish_time.unwrap();
                let message = Message {
                    client: self.client.clone(),
                    subscription_name: self.subscription_name.clone(),
                    data: pubsub_message.data,
                    message_id: pubsub_message.message_id,
                    ack_id: handle.ack_id,
                    attributes: pubsub_message.attributes,
                    publish_time: chrono::NaiveDateTime::from_timestamp(
                        timestamp.seconds,
                        timestamp.nanos as u32,
                    ),
                };
                break Some(message);
            } else {
                if let Ok(messages) = self.pull(50).await {
                    if messages.is_empty() {
                        break None;
                    }
                    self.buffer.extend(messages)
                }
            }
        }
    }
}

impl Pubsub {
    pub fn new(project_id: &str) -> Self {
        Self {
            project_id: project_id.to_string(),
        }
    }
    pub async fn subscription(&self, subscription_name: &str) -> Result<Subscription> {
        Ok(Subscription {
            project_id: self.project_id.clone(),
            subscription_name: subscription_name.to_string(),
            client: create_subscriber_client().await?,
            buffer: VecDeque::new(),
        })
    }
    pub async fn pull(
        &self,
        subscription_name: &str,
        max_messages: i32,
    ) -> Result<Vec<ReceivedMessage>> {
        let mut retry_counter: i32 = 0;
        let max_retry = 10;
        let mut subscriber_client = create_subscriber_client().await?;
        let response = loop {
            if retry_counter >= max_retry {
                debug!("retry exceed max_retry: {}", retry_counter);
                return Err(anyhow::anyhow!("gg"));
            }
            let mut pr = PullRequest::default();
            pr.subscription = format!(
                "projects/{}/subscriptions/{}",
                &self.project_id, subscription_name
            );
            pr.max_messages = max_messages;
            let result = subscriber_client.pull(pr).await;
            let res = match result {
                Ok(res) => res.into_inner(),
                Err(err) if err.code() == tonic::Code::Unauthenticated => {
                    subscriber_client = create_subscriber_client().await?;
                    debug!("unauthenticated: {}", err);
                    retry_counter += 1;
                    continue;
                }
                Err(err) => {
                    debug!("error: {}", err);
                    retry_counter += 1;
                    continue;
                }
            };
            break res;
        };
        Ok(response.received_messages)
    }

    pub async fn ack(&mut self, ack_ids: Vec<String>, subscription_name: &str) -> Result<()> {
        let mut subscriber_client = create_subscriber_client().await?;
        subscriber_client
            .acknowledge(AcknowledgeRequest {
                ack_ids,
                subscription: format!(
                    "projects/{}/subscriptions/{}",
                    &self.project_id, subscription_name
                ),
            })
            .await?
            .into_inner();
        Ok(())
    }
    pub async fn create_subscription(
        &mut self,
        topic_name: &str,
        subscription_name: &str,
        filter: &str,
    ) -> Result<()> {
        let mut subscriber_client = create_subscriber_client().await?;
        let subs = api::Subscription {
            name: format!(
                "projects/{}/subscriptions/{}",
                &self.project_id, subscription_name
            ),
            ack_deadline_seconds: 10,
            dead_letter_policy: None,
            detached: false,
            enable_message_ordering: true,
            expiration_policy: None,
            filter: filter.to_string(),
            labels: HashMap::new(),
            message_retention_duration: None,
            push_config: None,
            retain_acked_messages: false,
            retry_policy: None,
            topic: format!("projects/{}/topics/{}", &self.project_id, topic_name),
        };
        let resp = subscriber_client.create_subscription(subs).await?;
        debug!("{:?}", resp);
        Ok(())
    }

    pub async fn delete_subscription(&mut self, subscription_name: &str) -> Result<()> {
        let mut subscriber_client = create_subscriber_client().await?;
        let req = api::DeleteSubscriptionRequest {
            subscription: format!(
                "projects/{}/subscriptions/{}",
                &self.project_id, subscription_name
            ),
        };
        let resp = subscriber_client.delete_subscription(req).await?;
        debug!("{:?}", resp);
        Ok(())
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    #[test]
    fn new_works() {
        let project_id = "test-123";
        Pubsub::new(project_id);
    }
}
// let received_messages = match pubsub.pull(&&subscription_name, 50).await {
//     Ok(rs) => {
//         retry_counter = 0;
//         rs
//     }
//     Err(err) => {
//         retry_counter += 1;
//         if retry_counter > 10 {
//             break;
//         }
//         let binary: u64 = 2;
//         let secs = if binary.pow(retry_counter) > max_wait_secs {
//             max_wait_secs
//         } else {
//             binary.pow(retry_counter)
//         };
//         debug!(
//             "pubsub pull messages error: {:#?}, #{} retry, waiting for retry in {} secs",
//             err, retry_counter, secs
//         );
//         tokio::time::sleep(tokio::time::Duration::from_secs(secs)).await;
//         continue;
//     }
// };