use std::collections::VecDeque;
use std::future::Future;
use bytes::{Bytes, BytesMut};
use bytestring::ByteString;
use futures::future::{ok, Either};
use ntex::channel::{condition, oneshot};
use ntex_amqp_codec::protocol::{
    Attach, DeliveryNumber, DeliveryState, Disposition, Error, Flow, MessageFormat,
    ReceiverSettleMode, Role, SenderSettleMode, SequenceNo, Target, TerminusDurability,
    TerminusExpiryPolicy, TransferBody,
};
use ntex_amqp_codec::Encode;
use crate::cell::Cell;
use crate::error::AmqpProtocolError;
use crate::session::{Session, SessionInner, TransferState};
use crate::{Delivery, Handle};
#[derive(Clone)]
pub struct SenderLink {
    pub(crate) inner: Cell<SenderLinkInner>,
}
impl std::fmt::Debug for SenderLink {
    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        fmt.debug_tuple("SenderLink")
            .field(&std::ops::Deref::deref(&self.inner.get_ref().name))
            .finish()
    }
}
pub(crate) struct SenderLinkInner {
    pub(crate) id: usize,
    idx: u32,
    name: ByteString,
    session: Session,
    remote_handle: Handle,
    delivery_count: SequenceNo,
    link_credit: u32,
    pending_transfers: VecDeque<PendingTransfer>,
    error: Option<AmqpProtocolError>,
    closed: bool,
    on_close: condition::Condition,
}
struct PendingTransfer {
    idx: u32,
    tag: Option<Bytes>,
    body: Option<TransferBody>,
    state: TransferState,
    settle: Option<bool>,
    message_format: Option<MessageFormat>,
}
impl SenderLink {
    pub(crate) fn new(inner: Cell<SenderLinkInner>) -> SenderLink {
        SenderLink { inner }
    }
    pub fn id(&self) -> u32 {
        self.inner.id as u32
    }
    pub fn name(&self) -> &ByteString {
        &self.inner.name
    }
    pub fn remote_handle(&self) -> Handle {
        self.inner.remote_handle
    }
    pub fn session(&self) -> &Session {
        &self.inner.get_ref().session
    }
    pub fn session_mut(&mut self) -> &mut Session {
        &mut self.inner.get_mut().session
    }
    pub fn send<T>(&self, body: T) -> impl Future<Output = Result<Disposition, AmqpProtocolError>>
    where
        T: Into<TransferBody>,
    {
        self.inner.get_mut().send(body, None)
    }
    pub fn send_with_tag<T>(
        &self,
        body: T,
        tag: Bytes,
    ) -> impl Future<Output = Result<Disposition, AmqpProtocolError>>
    where
        T: Into<TransferBody>,
    {
        self.inner.get_mut().send(body, Some(tag))
    }
    pub fn settle_message(&self, id: DeliveryNumber, state: DeliveryState) {
        self.inner.get_mut().settle_message(id, state)
    }
    pub fn close(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
        self.inner.get_mut().close(None)
    }
    pub fn close_with_error<E>(
        &self,
        error: E,
    ) -> impl Future<Output = Result<(), AmqpProtocolError>>
    where
        Error: From<E>,
    {
        self.inner.get_mut().close(Some(error.into()))
    }
    pub fn on_close(&self) -> condition::Waiter {
        self.inner.get_ref().on_close.wait()
    }
}
impl SenderLinkInner {
    pub(crate) fn new(
        id: usize,
        name: ByteString,
        handle: Handle,
        delivery_count: SequenceNo,
        session: Cell<SessionInner>,
    ) -> SenderLinkInner {
        SenderLinkInner {
            id,
            name,
            delivery_count,
            idx: 0,
            session: Session::new(session),
            remote_handle: handle,
            link_credit: 0,
            pending_transfers: VecDeque::new(),
            error: None,
            closed: false,
            on_close: condition::Condition::new(),
        }
    }
    pub(crate) fn with(frame: &Attach, session: Cell<SessionInner>) -> SenderLinkInner {
        let mut name = None;
        if let Some(ref source) = frame.source {
            if let Some(ref addr) = source.address {
                name = Some(addr.clone());
            }
        }
        let delivery_count = frame.initial_delivery_count.unwrap_or(0);
        SenderLinkInner {
            delivery_count,
            id: 0,
            idx: 0,
            name: name.unwrap_or_else(ByteString::default),
            session: Session::new(session),
            remote_handle: frame.handle(),
            link_credit: 0,
            pending_transfers: VecDeque::new(),
            error: None,
            closed: false,
            on_close: condition::Condition::new(),
        }
    }
    pub(crate) fn id(&self) -> u32 {
        self.id as u32
    }
    pub(crate) fn remote_handle(&self) -> Handle {
        self.remote_handle
    }
    pub(crate) fn name(&self) -> &ByteString {
        &self.name
    }
    pub(crate) fn detached(&mut self, err: AmqpProtocolError) {
        trace!("Detaching sender link {:?} with error {:?}", self.name, err);
        
        for tr in self.pending_transfers.drain(..) {
            if let TransferState::First(tx) | TransferState::Only(tx) = tr.state {
                let _ = tx.send(Err(err.clone()));
            }
        }
        self.error = Some(err);
        self.on_close.notify();
    }
    pub(crate) fn close(
        &mut self,
        error: Option<Error>,
    ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
        if self.closed {
            Either::Left(ok(()))
        } else {
            self.closed = true;
            self.on_close.notify();
            let (tx, rx) = oneshot::channel();
            self.session
                .inner
                .get_mut()
                .detach_sender_link(self.id, true, error, tx);
            Either::Right(async move {
                match rx.await {
                    Ok(Ok(_)) => Ok(()),
                    Ok(Err(e)) => Err(e),
                    Err(_) => Err(AmqpProtocolError::Disconnected),
                }
            })
        }
    }
    pub(crate) fn apply_flow(&mut self, flow: &Flow) {
        
        if let Some(credit) = flow.link_credit() {
            trace!(
                "Apply sender link {:?} flow, credit: {:?} flow count: {:?}, delivery count: {:?}",
                self.name,
                credit,
                flow.delivery_count.unwrap_or(0),
                self.delivery_count
            );
            let delta = flow
                .delivery_count
                .unwrap_or(0)
                .saturating_add(credit)
                .saturating_sub(self.delivery_count);
            self.link_credit += delta;
            let session = self.session.inner.get_mut();
            
            while self.link_credit > 0 {
                if let Some(transfer) = self.pending_transfers.pop_front() {
                    self.link_credit -= 1;
                    self.delivery_count = self.delivery_count.saturating_add(1);
                    session.send_transfer(
                        self.id as u32,
                        transfer.idx,
                        transfer.body,
                        transfer.state,
                        transfer.tag,
                        transfer.settle,
                        transfer.message_format,
                    );
                } else {
                    break;
                }
            }
        }
        if flow.echo() {
            
        }
    }
    pub(crate) fn send<T: Into<TransferBody>>(&mut self, body: T, tag: Option<Bytes>) -> Delivery {
        if let Some(ref err) = self.error {
            Delivery::Resolved(Err(err.clone()))
        } else {
            let body = body.into();
            let message_format = body.message_format();
            let (delivery_tx, delivery_rx) = oneshot::channel();
            let max_frame_size = self.session.inner.get_ref().max_frame_size();
            let max_frame_size = if max_frame_size > 2048 {
                max_frame_size - 2048
            } else if max_frame_size == 0 {
                usize::MAX
            } else {
                max_frame_size
            };
            
            if body.len() > max_frame_size {
                let mut body = match body {
                    TransferBody::Data(data) => data,
                    TransferBody::Message(msg) => {
                        let mut buf = BytesMut::with_capacity(msg.encoded_size());
                        msg.encode(&mut buf);
                        buf.freeze()
                    }
                };
                let chunk = body.split_to(std::cmp::min(max_frame_size, body.len()));
                self.send_inner(
                    chunk.into(),
                    tag,
                    TransferState::First(delivery_tx),
                    message_format,
                );
                loop {
                    let chunk = body.split_to(std::cmp::min(max_frame_size, body.len()));
                    
                    if body.is_empty() {
                        self.send_inner(chunk.into(), None, TransferState::Last, message_format);
                        break;
                    } else {
                        self.send_inner(
                            chunk.into(),
                            None,
                            TransferState::Continue,
                            message_format,
                        );
                    }
                }
            } else {
                self.send_inner(body, tag, TransferState::Only(delivery_tx), message_format);
            }
            Delivery::Pending(delivery_rx)
        }
    }
    fn send_inner(
        &mut self,
        body: TransferBody,
        tag: Option<Bytes>,
        state: TransferState,
        message_format: Option<MessageFormat>,
    ) {
        if self.link_credit == 0 {
            log::trace!(
                "Sender link credit is 0, push to pending queue hnd:{} {:?}, queue size: {}",
                self.id as u32,
                tag,
                self.pending_transfers.len()
            );
            self.pending_transfers.push_back(PendingTransfer {
                tag,
                state,
                message_format,
                settle: Some(false),
                body: Some(body),
                idx: self.idx,
            });
        } else {
            self.link_credit -= 1;
            self.delivery_count = self.delivery_count.saturating_add(1);
            self.session.inner.get_mut().send_transfer(
                self.id as u32,
                self.idx,
                Some(body),
                state,
                tag,
                None,
                message_format,
            );
        }
        self.idx = self.idx.saturating_add(1);
    }
    pub(crate) fn settle_message(&mut self, id: DeliveryNumber, state: DeliveryState) {
        let disp = Disposition {
            role: Role::Sender,
            first: id,
            last: None,
            settled: true,
            state: Some(state),
            batchable: false,
        };
        let _ = self.session.inner.get_mut().post_frame(disp.into());
    }
}
pub struct SenderLinkBuilder {
    frame: Attach,
    session: Cell<SessionInner>,
}
impl SenderLinkBuilder {
    pub(crate) fn new(name: ByteString, address: ByteString, session: Cell<SessionInner>) -> Self {
        let target = Target {
            address: Some(address),
            durable: TerminusDurability::None,
            expiry_policy: TerminusExpiryPolicy::SessionEnd,
            timeout: 0,
            dynamic: false,
            dynamic_node_properties: None,
            capabilities: None,
        };
        let frame = Attach {
            name,
            handle: 0_u32,
            role: Role::Sender,
            snd_settle_mode: SenderSettleMode::Mixed,
            rcv_settle_mode: ReceiverSettleMode::First,
            source: None,
            target: Some(target),
            unsettled: None,
            incomplete_unsettled: false,
            initial_delivery_count: None,
            max_message_size: Some(65536 * 4),
            offered_capabilities: None,
            desired_capabilities: None,
            properties: None,
        };
        SenderLinkBuilder { frame, session }
    }
    pub fn max_message_size(mut self, size: u64) -> Self {
        self.frame.max_message_size = Some(size);
        self
    }
    pub fn with_frame<F>(mut self, f: F) -> Self
    where
        F: FnOnce(&mut Attach),
    {
        f(&mut self.frame);
        self
    }
    pub async fn open(self) -> Result<SenderLink, AmqpProtocolError> {
        let result = self.session.get_mut().open_sender_link(self.frame).await;
        match result {
            Ok(Ok(link)) => Ok(link),
            Ok(Err(e)) => Err(e),
            Err(_) => Err(AmqpProtocolError::Disconnected),
        }
    }
}