Skip to main content

ntex_amqp/
session.rs

1use std::{cmp, collections::VecDeque, fmt, future::Future, mem};
2
3use ntex_bytes::{ByteString, Bytes};
4use ntex_util::channel::{condition, oneshot, pool};
5use ntex_util::{HashMap, future::Either, future::Ready};
6use slab::Slab;
7
8use ntex_amqp_codec::protocol::{
9    self as codec, Accepted, Attach, Begin, DeliveryNumber, DeliveryState, Detach, Disposition,
10    End, Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode,
11    Source, Transfer, TransferBody, TransferNumber,
12};
13use ntex_amqp_codec::{AmqpFrame, Encode};
14
15use crate::delivery::DeliveryInner;
16use crate::error::AmqpProtocolError;
17use crate::rcvlink::{
18    EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner,
19};
20use crate::sndlink::{EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner};
21use crate::{ConnectionRef, ControlFrame, cell::Cell, types::Action};
22
23pub(crate) const INITIAL_NEXT_OUTGOING_ID: TransferNumber = 1;
24
25#[derive(Clone)]
26pub struct Session {
27    pub(crate) inner: Cell<SessionInner>,
28}
29
30#[derive(Debug)]
31pub(crate) struct SessionInner {
32    id: usize,
33    sink: ConnectionRef,
34    next_outgoing_id: TransferNumber,
35    flags: Flags,
36    begin: Begin,
37
38    remote_channel_id: u16,
39    next_incoming_id: TransferNumber,
40    remote_outgoing_window: u32,
41    remote_incoming_window: u32,
42
43    links: Slab<Either<SenderLinkState, ReceiverLinkState>>,
44    links_by_name: HashMap<ByteString, usize>,
45    remote_handles: HashMap<Handle, usize>,
46    error: Option<AmqpProtocolError>,
47    closed: condition::Condition,
48
49    pending_transfers: VecDeque<PendingTransfer>,
50    pub(crate) unsettled_snd_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
51    pub(crate) unsettled_rcv_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
52
53    pub(crate) pool_notify: pool::Pool<()>,
54    pub(crate) pool_credit: pool::Pool<Result<(), AmqpProtocolError>>,
55}
56
57impl fmt::Debug for Session {
58    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59        fmt.debug_struct("Session").finish()
60    }
61}
62
63impl Session {
64    pub(crate) fn new(inner: Cell<SessionInner>) -> Session {
65        Session { inner }
66    }
67
68    #[inline]
69    /// Get begin frame reference
70    pub fn frame(&self) -> &Begin {
71        &self.inner.get_ref().begin
72    }
73
74    #[inline]
75    /// Get io tag for current connection
76    pub fn tag(&self) -> &'static str {
77        self.inner.get_ref().sink.tag()
78    }
79
80    #[inline]
81    pub fn connection(&self) -> &ConnectionRef {
82        &self.inner.get_ref().sink
83    }
84
85    #[inline]
86    pub fn local_channel_id(&self) -> u16 {
87        self.inner.get_ref().id()
88    }
89
90    #[inline]
91    pub fn remote_channel_id(&self) -> u16 {
92        self.inner.get_ref().remote_channel_id
93    }
94
95    #[inline]
96    /// Get remote window size
97    pub fn remote_window_size(&self) -> u32 {
98        self.inner.get_ref().remote_incoming_window
99    }
100
101    pub fn end(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
102        let inner = self.inner.get_mut();
103
104        if inner.flags.contains(Flags::ENDED) {
105            Either::Left(Ready::Ok(()))
106        } else {
107            if !inner.flags.contains(Flags::ENDING) {
108                inner.sink.close_session(inner.remote_channel_id as usize);
109                inner.post_frame(Frame::End(End { error: None }));
110                inner.flags.insert(Flags::ENDING);
111                inner
112                    .sink
113                    .get_control_queue()
114                    .enqueue_frame(ControlFrame::new_kind(
115                        crate::ControlFrameKind::LocalSessionEnded(inner.get_all_links()),
116                    ));
117            }
118            let inner = self.inner.clone();
119            Either::Right(async move {
120                inner.closed.wait().await;
121                if let Some(err @ AmqpProtocolError::SessionEnded(Some(_))) = inner.error.clone() {
122                    Err(err)
123                } else {
124                    Ok(())
125                }
126            })
127        }
128    }
129
130    pub fn get_sender_link(&self, name: &str) -> Option<&SenderLink> {
131        let inner = self.inner.get_ref();
132
133        if let Some(id) = inner.links_by_name.get(name)
134            && let Some(Either::Left(SenderLinkState::Established(link))) = inner.links.get(*id)
135        {
136            return Some(link);
137        }
138        None
139    }
140
141    #[inline]
142    pub fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
143        self.inner.get_ref().get_sender_link_by_local_handle(hnd)
144    }
145
146    #[inline]
147    pub fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
148        self.inner.get_ref().get_sender_link_by_remote_handle(hnd)
149    }
150
151    #[inline]
152    pub fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
153        self.inner.get_ref().get_receiver_link_by_local_handle(hnd)
154    }
155
156    #[inline]
157    pub fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
158        self.inner.get_ref().get_receiver_link_by_remote_handle(hnd)
159    }
160
161    /// Open sender link
162    pub fn build_sender_link<T: Into<ByteString>, U: Into<ByteString>>(
163        &self,
164        name: U,
165        address: T,
166    ) -> SenderLinkBuilder {
167        let name = name.into();
168        let address = address.into();
169        SenderLinkBuilder::new(name, address, self.inner.clone())
170    }
171
172    /// Open receiver link
173    pub fn build_receiver_link<T: Into<ByteString>, U: Into<ByteString>>(
174        &self,
175        name: U,
176        address: T,
177    ) -> ReceiverLinkBuilder {
178        let name = name.into();
179        let address = address.into();
180        ReceiverLinkBuilder::new(name, address, self.inner.clone())
181    }
182
183    /// Detach receiver link
184    pub fn detach_receiver_link(
185        &self,
186        handle: Handle,
187        error: Option<Error>,
188    ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
189        let (tx, rx) = oneshot::channel();
190
191        self.inner
192            .get_mut()
193            .detach_receiver_link(handle, false, error, tx);
194
195        async move {
196            match rx.await {
197                Ok(Ok(())) => Ok(()),
198                Ok(Err(e)) => {
199                    log::trace!("Cannot complete detach receiver link {e:?}");
200                    Err(e)
201                }
202                Err(_) => {
203                    log::trace!("Cannot complete detach receiver link, connection is gone");
204                    Err(AmqpProtocolError::Disconnected)
205                }
206            }
207        }
208    }
209
210    /// Detach sender link
211    pub fn detach_sender_link(
212        &self,
213        handle: Handle,
214        error: Option<Error>,
215    ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
216        let (tx, rx) = oneshot::channel();
217
218        self.inner
219            .get_mut()
220            .detach_sender_link(handle, false, error, tx);
221
222        async move {
223            match rx.await {
224                Ok(Ok(())) => Ok(()),
225                Ok(Err(e)) => {
226                    log::trace!("Cannot complete detach sender link {e:?}");
227                    Err(e)
228                }
229                Err(_) => {
230                    log::trace!("Cannot complete detach sender link, connection is gone");
231                    Err(AmqpProtocolError::Disconnected)
232                }
233            }
234        }
235    }
236}
237
238#[derive(Debug)]
239enum SenderLinkState {
240    Established(EstablishedSenderLink),
241    OpeningRemote,
242    Opening(Option<oneshot::Sender<Result<Cell<SenderLinkInner>, AmqpProtocolError>>>),
243    Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
244}
245
246#[derive(Debug)]
247enum ReceiverLinkState {
248    Opening(Box<Option<(Cell<ReceiverLinkInner>, Option<Source>)>>),
249    OpeningLocal(
250        Option<(
251            Cell<ReceiverLinkInner>,
252            oneshot::Sender<Result<ReceiverLink, AmqpProtocolError>>,
253        )>,
254    ),
255    Established(EstablishedReceiverLink),
256    Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
257}
258
259impl SenderLinkState {
260    fn is_opening(&self) -> bool {
261        matches!(self, SenderLinkState::Opening(_))
262    }
263}
264
265impl ReceiverLinkState {
266    fn is_opening(&self) -> bool {
267        matches!(self, ReceiverLinkState::OpeningLocal(_))
268    }
269}
270
271bitflags::bitflags! {
272    #[derive(Copy, Clone, Debug)]
273    struct Flags: u8 {
274        const LOCAL =  0b0000_0001;
275        const ENDED =  0b0000_0010;
276        const ENDING = 0b0000_0100;
277    }
278}
279
280#[derive(Debug)]
281struct PendingTransfer {
282    tx: pool::Sender<Result<(), AmqpProtocolError>>,
283    link_handle: Handle,
284}
285
286impl SessionInner {
287    pub(crate) fn new(
288        id: usize,
289        local: bool,
290        sink: ConnectionRef,
291        remote_channel_id: u16,
292        begin: Begin,
293    ) -> SessionInner {
294        SessionInner {
295            next_incoming_id: begin.next_outgoing_id(),
296            remote_incoming_window: begin.incoming_window(),
297            remote_outgoing_window: begin.outgoing_window(),
298            flags: if local { Flags::LOCAL } else { Flags::empty() },
299            next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
300            unsettled_snd_deliveries: HashMap::default(),
301            unsettled_rcv_deliveries: HashMap::default(),
302            links: Slab::new(),
303            links_by_name: HashMap::default(),
304            remote_handles: HashMap::default(),
305            pending_transfers: VecDeque::new(),
306            error: None,
307            pool_notify: pool::new(),
308            pool_credit: pool::new(),
309            closed: condition::Condition::new(),
310            id,
311            sink,
312            begin,
313            remote_channel_id,
314        }
315    }
316
317    /// Local channel id
318    pub(crate) fn id(&self) -> u16 {
319        self.id as u16
320    }
321
322    pub(crate) fn tag(&self) -> &'static str {
323        self.sink.tag()
324    }
325
326    pub(crate) fn unsettled_deliveries(
327        &mut self,
328        sender: bool,
329    ) -> &mut HashMap<DeliveryNumber, DeliveryInner> {
330        if sender {
331            &mut self.unsettled_snd_deliveries
332        } else {
333            &mut self.unsettled_rcv_deliveries
334        }
335    }
336
337    /// Set error. New operations will return error.
338    pub(crate) fn set_error(&mut self, err: AmqpProtocolError) {
339        log::trace!(
340            "{}: Connection is failed, dropping state: {err:?}",
341            self.tag()
342        );
343
344        // drop pending transfers
345        for tr in self.pending_transfers.drain(..) {
346            let _ = tr.tx.send(Err(err.clone()));
347        }
348
349        // drop unsettled deliveries
350        for (_, mut promise) in self.unsettled_snd_deliveries.drain() {
351            promise.set_error(err.clone());
352        }
353        for (_, mut promise) in self.unsettled_rcv_deliveries.drain() {
354            promise.set_error(err.clone());
355        }
356
357        // drop links
358        self.links_by_name.clear();
359        for (_, st) in &mut self.links {
360            match st {
361                Either::Left(SenderLinkState::Established(link)) => {
362                    link.inner.get_mut().remote_detached(err.clone());
363                }
364                Either::Left(SenderLinkState::Closing(link)) => {
365                    if let Some(tx) = link.take() {
366                        let _ = tx.send(Err(err.clone()));
367                    }
368                }
369                Either::Right(ReceiverLinkState::Established(link)) => {
370                    link.remote_detached(None);
371                }
372                _ => (),
373            }
374        }
375        self.links.clear();
376
377        self.error = Some(err);
378        self.flags.insert(Flags::ENDED);
379        self.closed.notify_and_lock_readiness();
380    }
381
382    /// End session.
383    pub(crate) fn end(&mut self, err: AmqpProtocolError) -> Action {
384        log::trace!("{}: Session is ended: {err:?}", self.tag());
385
386        let links = self.get_all_links();
387        self.set_error(err);
388
389        Action::SessionEnded(links)
390    }
391
392    fn get_all_links(&self) -> Vec<Either<SenderLink, ReceiverLink>> {
393        self.links
394            .iter()
395            .filter_map(|(_, st)| match st {
396                Either::Left(SenderLinkState::Established(link)) => {
397                    Some(Either::Left((*link).clone()))
398                }
399                Either::Right(ReceiverLinkState::Established(link)) => {
400                    Some(Either::Right((*link).clone()))
401                }
402                _ => None,
403            })
404            .collect()
405    }
406
407    pub(crate) fn max_frame_size(&self) -> u32 {
408        self.sink.0.max_frame_size
409    }
410
411    /// Initialize creation of remote sender link
412    pub(crate) fn new_remote_sender(&mut self, attach: &Attach) -> (usize, Attach) {
413        let id = self
414            .links
415            .insert(Either::Left(SenderLinkState::OpeningRemote));
416
417        let attach = Attach(Box::new(codec::AttachInner {
418            name: attach.0.name.clone(),
419            handle: id as Handle,
420            role: Role::Sender,
421            snd_settle_mode: attach.snd_settle_mode(),
422            rcv_settle_mode: attach.rcv_settle_mode(),
423            source: attach.0.source.clone(),
424            target: attach.0.target.clone(),
425            unsettled: None,
426            incomplete_unsettled: false,
427            initial_delivery_count: Some(attach.initial_delivery_count().unwrap_or(0)),
428            max_message_size: None,
429            offered_capabilities: None,
430            desired_capabilities: None,
431            properties: None,
432        }));
433
434        (id, attach)
435    }
436
437    /// Open sender link
438    pub(crate) fn attach_local_sender_link(
439        &mut self,
440        mut frame: Attach,
441    ) -> oneshot::Receiver<Result<Cell<SenderLinkInner>, AmqpProtocolError>> {
442        let (tx, rx) = oneshot::channel();
443
444        let entry = self.links.vacant_entry();
445        let token = entry.key();
446        entry.insert(Either::Left(SenderLinkState::Opening(Some(tx))));
447        log::trace!(
448            "{}: Local sender link opening: {:?} hnd:{token:?}",
449            self.tag(),
450            frame.name(),
451        );
452
453        frame.0.handle = token as Handle;
454
455        self.links_by_name.insert(frame.0.name.clone(), token);
456        self.post_frame(Frame::Attach(frame));
457        rx
458    }
459
460    /// Register remote sender link
461    pub(crate) fn attach_remote_sender_link(
462        &mut self,
463        attach: &Attach,
464        mut response: Attach,
465        link: Cell<SenderLinkInner>,
466    ) -> SenderLink {
467        log::trace!(
468            "{}: Remote sender link attached: {:?}",
469            self.tag(),
470            attach.name()
471        );
472        let token = link.id;
473
474        if let Some(source) = attach.source()
475            && let Some(ref addr) = source.address
476        {
477            self.links_by_name.insert(addr.clone(), token);
478        }
479
480        self.remote_handles.insert(attach.handle(), token);
481        *self
482            .links
483            .get_mut(token)
484            .expect("new remote sender entry must exist") = Either::Left(
485            SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
486        );
487
488        *response.handle_mut() = token as Handle;
489        *response.max_message_size_mut() = link.max_message_size().map(u64::from);
490
491        self.post_frame(response.into());
492        SenderLink::new(link)
493    }
494
495    /// Detach sender link
496    pub(crate) fn detach_sender_link(
497        &mut self,
498        id: Handle,
499        closed: bool,
500        error: Option<Error>,
501        tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
502    ) {
503        if let Some(Either::Left(link)) = self.links.get_mut(id as usize) {
504            match link {
505                SenderLinkState::Opening(_) => {
506                    let detach = Detach(Box::new(codec::DetachInner {
507                        handle: id,
508                        closed,
509                        error,
510                    }));
511                    *link = SenderLinkState::Closing(Some(tx));
512                    self.post_frame(detach.into());
513                }
514                SenderLinkState::Established(sender_link) => {
515                    let sender_link = sender_link.clone();
516                    let detach = Detach(Box::new(codec::DetachInner {
517                        handle: id,
518                        closed,
519                        error,
520                    }));
521
522                    *link = SenderLinkState::Closing(Some(tx));
523                    self.post_frame(detach.clone().into());
524                    self.sink
525                        .get_control_queue()
526                        .enqueue_frame(ControlFrame::new_kind(
527                            crate::ControlFrameKind::LocalDetachSender(detach, sender_link),
528                        ));
529                }
530                SenderLinkState::OpeningRemote => {
531                    let _ = tx.send(Ok(()));
532                    log::error!(
533                        "{}: Unexpected sender link state: opening remote - {id}",
534                        self.tag()
535                    );
536                }
537                SenderLinkState::Closing(_) => {
538                    let _ = tx.send(Ok(()));
539                    log::error!(
540                        "{}: Unexpected sender link state: closing - {id}",
541                        self.tag()
542                    );
543                }
544            }
545        } else {
546            let _ = tx.send(Ok(()));
547            log::debug!(
548                "{}: Sender link does not exist while detaching: {id}",
549                self.tag()
550            );
551        }
552    }
553
554    /// Detach unconfirmed sender link
555    pub(crate) fn detach_unconfirmed_sender_link(
556        &mut self,
557        attach: &Attach,
558        link: &Cell<SenderLinkInner>,
559        error: Option<Error>,
560    ) {
561        let token = link.id;
562
563        let attach = Attach(Box::new(codec::AttachInner {
564            name: attach.0.name.clone(),
565            handle: token as Handle,
566            role: attach.0.role,
567            snd_settle_mode: SenderSettleMode::Unsettled,
568            rcv_settle_mode: ReceiverSettleMode::First,
569            source: None,
570            target: None,
571            unsettled: None,
572            incomplete_unsettled: false,
573            initial_delivery_count: None,
574            max_message_size: None,
575            offered_capabilities: None,
576            desired_capabilities: None,
577            properties: None,
578        }));
579        self.post_frame(attach.into());
580
581        let detach = Detach(Box::new(codec::DetachInner {
582            handle: token as Handle,
583            closed: true,
584            error,
585        }));
586        self.post_frame(detach.into());
587
588        if self.links.contains(token) {
589            self.links.remove(token);
590        }
591    }
592
593    pub(crate) fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
594        if let Some(Either::Left(SenderLinkState::Established(link))) = self.links.get(hnd as usize)
595        {
596            Some(link)
597        } else {
598            None
599        }
600    }
601
602    pub(crate) fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
603        if let Some(id) = self.remote_handles.get(&hnd)
604            && let Some(Either::Left(SenderLinkState::Established(link))) = self.links.get(*id)
605        {
606            return Some(link);
607        }
608        None
609    }
610
611    /// Register receiver link
612    pub(crate) fn attach_remote_receiver_link(
613        &mut self,
614        cell: Cell<SessionInner>,
615        attach: &Attach,
616    ) -> (Attach, ReceiverLink) {
617        let handle = attach.handle();
618        let entry = self.links.vacant_entry();
619        let token = entry.key();
620
621        let inner = Cell::new(ReceiverLinkInner::new(cell, token as u32, handle, attach));
622        entry.insert(Either::Right(ReceiverLinkState::Opening(Box::new(Some((
623            inner.clone(),
624            attach.source().cloned(),
625        ))))));
626        self.remote_handles.insert(handle, token);
627
628        let response = Attach(Box::new(codec::AttachInner {
629            name: attach.0.name.clone(),
630            handle: token as Handle,
631            role: Role::Receiver,
632            snd_settle_mode: attach.snd_settle_mode(),
633            rcv_settle_mode: ReceiverSettleMode::First,
634            source: attach.0.source.clone(),
635            target: attach.0.target.clone(),
636            unsettled: None,
637            incomplete_unsettled: false,
638            initial_delivery_count: Some(0),
639            offered_capabilities: None,
640            desired_capabilities: None,
641            max_message_size: None,
642            properties: None,
643        }));
644
645        (response, ReceiverLink::new(inner))
646    }
647
648    pub(crate) fn attach_local_receiver_link(
649        &mut self,
650        cell: Cell<SessionInner>,
651        mut frame: Attach,
652    ) -> oneshot::Receiver<Result<ReceiverLink, AmqpProtocolError>> {
653        let (tx, rx) = oneshot::channel();
654
655        let entry = self.links.vacant_entry();
656        let token = entry.key();
657
658        let inner = Cell::new(ReceiverLinkInner::new(
659            cell,
660            token as u32,
661            token as u32,
662            &frame,
663        ));
664        entry.insert(Either::Right(ReceiverLinkState::OpeningLocal(Some((
665            inner, tx,
666        )))));
667
668        frame.0.handle = token as Handle;
669
670        self.links_by_name.insert(frame.0.name.clone(), token);
671        self.post_frame(Frame::Attach(frame));
672        rx
673    }
674
675    pub(crate) fn confirm_receiver_link(
676        &mut self,
677        token: Handle,
678        mut response: Attach,
679        max_message_size: Option<u64>,
680    ) {
681        if let Some(Either::Right(link)) = self.links.get_mut(token as usize)
682            && let ReceiverLinkState::Opening(l) = link
683            && let Some((l, _)) = l.take()
684        {
685            *response.max_message_size_mut() = max_message_size;
686            *link = ReceiverLinkState::Established(EstablishedReceiverLink::new(l));
687            self.post_frame(response.into());
688            return;
689        }
690        // TODO: close session
691        log::error!("{}: Unexpected receiver link state", self.tag());
692    }
693
694    /// Detach receiver link
695    pub(crate) fn detach_receiver_link(
696        &mut self,
697        id: Handle,
698        closed: bool,
699        error: Option<Error>,
700        tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
701    ) {
702        if let Some(Either::Right(link)) = self.links.get_mut(id as usize) {
703            match link {
704                ReceiverLinkState::Opening(inner) => {
705                    if let Some((inner, source)) = inner.take() {
706                        let attach = Attach(Box::new(codec::AttachInner {
707                            source,
708                            max_message_size: None,
709                            name: inner.name().clone(),
710                            handle: id,
711                            role: Role::Receiver,
712                            snd_settle_mode: SenderSettleMode::Mixed,
713                            rcv_settle_mode: ReceiverSettleMode::First,
714                            target: None,
715                            unsettled: None,
716                            incomplete_unsettled: false,
717                            initial_delivery_count: Some(0),
718                            offered_capabilities: None,
719                            desired_capabilities: None,
720                            properties: None,
721                        }));
722                        self.post_frame(attach.into());
723                    }
724                    let detach = Detach(Box::new(codec::DetachInner {
725                        closed,
726                        error,
727                        handle: id,
728                    }));
729                    self.post_frame(detach.into());
730                    let _ = tx.send(Ok(()));
731                    if self.links.contains(id as usize) {
732                        let _ = self.links.remove(id as usize);
733                    }
734                }
735                ReceiverLinkState::Established(receiver_link) => {
736                    let receiver_link = receiver_link.clone();
737                    let detach = Detach(Box::new(codec::DetachInner {
738                        handle: id,
739                        closed,
740                        error,
741                    }));
742                    *link = ReceiverLinkState::Closing(Some(tx));
743                    self.post_frame(detach.clone().into());
744                    self.sink
745                        .get_control_queue()
746                        .enqueue_frame(ControlFrame::new_kind(
747                            crate::ControlFrameKind::LocalDetachReceiver(detach, receiver_link),
748                        ));
749                }
750                ReceiverLinkState::Closing(_) => {
751                    let _ = tx.send(Ok(()));
752                    if self.links.contains(id as usize) {
753                        let _ = self.links.remove(id as usize);
754                    }
755                    log::error!(
756                        "{}: Unexpected receiver link state: closing - {id}",
757                        self.tag()
758                    );
759                }
760                ReceiverLinkState::OpeningLocal(_inner) => unimplemented!(),
761            }
762        } else {
763            let _ = tx.send(Ok(()));
764            log::error!(
765                "{}: Receiver link does not exist while detaching: {id}",
766                self.tag()
767            );
768        }
769    }
770
771    pub(crate) fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
772        if let Some(Either::Right(ReceiverLinkState::Established(link))) =
773            self.links.get(hnd as usize)
774        {
775            Some(link)
776        } else {
777            None
778        }
779    }
780
781    pub(crate) fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
782        if let Some(id) = self.remote_handles.get(&hnd)
783            && let Some(Either::Right(ReceiverLinkState::Established(link))) = self.links.get(*id)
784        {
785            return Some(link);
786        }
787        None
788    }
789
790    pub(crate) fn handle_frame(&mut self, frame: Frame) -> Result<Action, AmqpProtocolError> {
791        if self.error.is_none() {
792            match frame {
793                Frame::Flow(flow) => {
794                    // apply link flow
795                    if let Some(Either::Left(link)) = flow
796                        .handle()
797                        .and_then(|h| self.remote_handles.get(&h).copied())
798                        .and_then(|h| self.links.get_mut(h))
799                    {
800                        if let SenderLinkState::Established(link) = link {
801                            return Ok(Action::Flow((*link).clone(), flow));
802                        }
803                        log::warn!("{}: Received flow frame", self.tag());
804                    }
805                    self.handle_flow(&flow, None);
806                    Ok(Action::None)
807                }
808                Frame::Disposition(disp) => {
809                    self.settle_deliveries(&disp);
810                    Ok(Action::None)
811                }
812                Frame::Transfer(transfer) => {
813                    let idx = if let Some(idx) = self.remote_handles.get(&transfer.handle()) {
814                        *idx
815                    } else {
816                        log::debug!(
817                            "{}: Transfer's link {:?} is unknown",
818                            self.tag(),
819                            transfer.handle()
820                        );
821                        return Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)));
822                    };
823
824                    if let Some(link) = self.links.get_mut(idx) {
825                        match link {
826                            Either::Left(_) => {
827                                log::debug!(
828                                    "{}: Got unexpected trasfer from sender link",
829                                    self.tag()
830                                );
831                                Err(AmqpProtocolError::Unexpected(Frame::Transfer(transfer)))
832                            }
833                            Either::Right(link) => match link {
834                                ReceiverLinkState::Opening(_)
835                                | ReceiverLinkState::OpeningLocal(_) => {
836                                    log::debug!(
837                                        "{}: Got transfer for opening link: {} -> {idx}",
838                                        self.tag(),
839                                        transfer.handle()
840                                    );
841                                    Err(AmqpProtocolError::UnexpectedOpeningState(Frame::Transfer(
842                                        transfer,
843                                    )))
844                                }
845                                ReceiverLinkState::Established(link) => {
846                                    // self.outgoing_window -= 1;
847                                    let _ = self.next_incoming_id.wrapping_add(1);
848                                    Ok(link.inner.get_mut().handle_transfer(transfer, &link.inner))
849                                }
850                                ReceiverLinkState::Closing(_) => Ok(Action::None),
851                            },
852                        }
853                    } else {
854                        Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)))
855                    }
856                }
857                Frame::Detach(detach) => Ok(self.handle_detach(detach)),
858                frame => {
859                    log::debug!("{}: Unexpected frame: {frame:?}", self.tag());
860                    Ok(Action::None)
861                }
862            }
863        } else {
864            Ok(Action::None)
865        }
866    }
867
868    /// Handle `Attach` frame. return false if attach frame is remote and can not be handled
869    pub(crate) fn handle_attach(&mut self, attach: &Attach, cell: Cell<SessionInner>) -> bool {
870        let name = attach.name();
871
872        if let Some(index) = self.links_by_name.get(name) {
873            match self.links.get_mut(*index) {
874                Some(Either::Left(item)) => {
875                    if item.is_opening() {
876                        log::trace!(
877                            "{}: Local sender link attached: {name:?} {index} -> {}, {:?}",
878                            self.sink.tag(),
879                            attach.handle(),
880                            self.remote_handles.contains_key(&attach.handle())
881                        );
882
883                        self.remote_handles.insert(attach.handle(), *index);
884                        let delivery_count = attach.initial_delivery_count().unwrap_or(0);
885                        let link = Cell::new(SenderLinkInner::new(
886                            *index,
887                            name.clone(),
888                            attach.handle(),
889                            delivery_count,
890                            cell,
891                            attach
892                                .max_message_size()
893                                .map(|v| u32::try_from(v).unwrap_or(u32::MAX)),
894                        ));
895                        let local_sender = mem::replace(
896                            item,
897                            SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
898                        );
899
900                        if let SenderLinkState::Opening(Some(tx)) = local_sender {
901                            let _ = tx.send(Ok(link));
902                        }
903                    }
904                }
905                Some(Either::Right(item)) => {
906                    if item.is_opening() {
907                        log::trace!(
908                            "{}: Local receiver link attached: {name:?} {index} -> {}",
909                            self.sink.tag(),
910                            attach.handle()
911                        );
912                        if let ReceiverLinkState::OpeningLocal(opt_item) = item {
913                            if let Some((link, tx)) = opt_item.take() {
914                                self.remote_handles.insert(attach.handle(), *index);
915
916                                *item = ReceiverLinkState::Established(
917                                    EstablishedReceiverLink::new(link.clone()),
918                                );
919                                let _ = tx.send(Ok(ReceiverLink::new(link)));
920                            } else {
921                                // TODO: close session
922                                log::error!("{}: Inconsistent session state, bug", self.tag());
923                            }
924                        }
925                    }
926                }
927                _ => {
928                    // TODO: error in proto, have to close connection
929                    log::trace!("Protocol error");
930                }
931            }
932            true
933        } else {
934            // cannot handle remote attach
935            false
936        }
937    }
938
939    #[allow(clippy::too_many_lines)]
940    /// Handle `Detach` frame.
941    pub(crate) fn handle_detach(&mut self, mut frame: Detach) -> Action {
942        // get local link instance
943        let idx = if let Some(idx) = self.remote_handles.get(&frame.handle()) {
944            *idx
945        } else if self.links.contains(frame.handle() as usize) {
946            frame.handle() as usize
947        } else {
948            // should not happen, error
949            log::info!("{}: Detaching unknown link: {frame:?}", self.tag());
950            return Action::None;
951        };
952
953        let handle = frame.handle();
954        let mut action = Action::None;
955
956        let remove = if let Some(link) = self.links.get_mut(idx) {
957            match link {
958                Either::Left(link) => match link {
959                    SenderLinkState::Opening(tx) => {
960                        if let Some(tx) = tx.take() {
961                            let err = AmqpProtocolError::LinkDetached(frame.0.error.clone());
962                            let _ = tx.send(Err(err));
963                        }
964                        true
965                    }
966                    SenderLinkState::Established(link) => {
967                        // detach from remote endpoint
968                        let detach = Detach(Box::new(codec::DetachInner {
969                            handle: link.inner.get_ref().id(),
970                            closed: true,
971                            error: frame.error().cloned(),
972                        }));
973                        let err = AmqpProtocolError::LinkDetached(detach.0.error.clone());
974
975                        // remove name
976                        self.links_by_name.remove(link.inner.name());
977
978                        // drop pending transfers
979                        let mut idx = 0;
980                        let handle = link.inner.get_ref().remote_handle();
981                        while idx < self.pending_transfers.len() {
982                            if self.pending_transfers[idx].link_handle == handle {
983                                let tr = self.pending_transfers.remove(idx).unwrap();
984                                let _ = tr.tx.send(Err(err.clone()));
985                            } else {
986                                idx += 1;
987                            }
988                        }
989
990                        // drop unsettled transfers
991                        let handle = link.inner.get_ref().id() as Handle;
992                        for delivery in self.unsettled_snd_deliveries.values_mut() {
993                            if delivery.handle() == handle {
994                                delivery.set_error(err.clone());
995                            }
996                        }
997
998                        // detach snd link
999                        link.inner.get_mut().remote_detached(err);
1000                        self.sink
1001                            .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1002                        action = Action::DetachSender(link.clone(), frame);
1003                        true
1004                    }
1005                    SenderLinkState::OpeningRemote => {
1006                        log::warn!(
1007                            "{}: Detach frame received for unconfirmed sender link: {frame:?}",
1008                            self.tag()
1009                        );
1010                        true
1011                    }
1012                    SenderLinkState::Closing(tx) => {
1013                        if let Some(tx) = tx.take() {
1014                            if let Some(err) = frame.0.error {
1015                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1016                            } else {
1017                                let _ = tx.send(Ok(()));
1018                            }
1019                        }
1020                        true
1021                    }
1022                },
1023                Either::Right(link) => match link {
1024                    ReceiverLinkState::Opening(_) => false,
1025                    ReceiverLinkState::OpeningLocal(item) => {
1026                        if let Some((inner, tx)) = item.take() {
1027                            inner.get_mut().detached();
1028                            if let Some(err) = frame.0.error.clone() {
1029                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1030                            } else {
1031                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(None)));
1032                            }
1033                        } else {
1034                            log::error!("{}: Inconsistent session state, bug", self.tag());
1035                        }
1036
1037                        true
1038                    }
1039                    ReceiverLinkState::Established(link) => {
1040                        let error = frame.0.error.take();
1041
1042                        // drop unsettled transfers
1043                        let err = AmqpProtocolError::LinkDetached(error.clone());
1044                        let handle = link.inner.get_ref().id();
1045                        for delivery in self.unsettled_rcv_deliveries.values_mut() {
1046                            if delivery.handle() == handle {
1047                                delivery.set_error(err.clone());
1048                            }
1049                        }
1050
1051                        // detach from remote endpoint
1052                        let detach = Detach(Box::new(codec::DetachInner {
1053                            handle: link.handle(),
1054                            closed: true,
1055                            error: None,
1056                        }));
1057                        self.sink
1058                            .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1059
1060                        // detach rcv link
1061                        link.remote_detached(error);
1062                        action = Action::DetachReceiver(link.clone(), frame);
1063                        true
1064                    }
1065                    ReceiverLinkState::Closing(tx) => {
1066                        // detach confirmation
1067                        if let Some(tx) = tx.take() {
1068                            if let Some(err) = frame.0.error {
1069                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1070                            } else {
1071                                let _ = tx.send(Ok(()));
1072                            }
1073                        }
1074                        true
1075                    }
1076                },
1077            }
1078        } else {
1079            false
1080        };
1081
1082        if remove {
1083            if self.links.contains(idx) {
1084                self.links.remove(idx);
1085            }
1086            self.remote_handles.remove(&handle);
1087        }
1088        action
1089    }
1090
1091    fn settle_deliveries(&mut self, disp: &Disposition) {
1092        let from = disp.first();
1093        let to = disp.last();
1094
1095        if cfg!(feature = "frame-trace") {
1096            log::trace!("{}: Settle delivery: {disp:#?}", self.tag());
1097        } else {
1098            log::trace!(
1099                "{}: Settle delivery from {from} - {to:?}, state {:?} settled: {:?}",
1100                self.tag(),
1101                disp.state(),
1102                disp.settled()
1103            );
1104        }
1105
1106        let deliveries = if disp.role() == Role::Receiver {
1107            &mut self.unsettled_snd_deliveries
1108        } else {
1109            &mut self.unsettled_rcv_deliveries
1110        };
1111
1112        if let Some(to) = to {
1113            for no in from..=to {
1114                if let Some(delivery) = deliveries.get_mut(&no) {
1115                    delivery.handle_disposition(disp);
1116                } else {
1117                    log::trace!(
1118                        "{}: Unknown deliveryid: {no:?} disp: {disp:?}",
1119                        self.sink.tag()
1120                    );
1121                }
1122            }
1123        } else if let Some(delivery) = deliveries.get_mut(&from) {
1124            delivery.handle_disposition(disp);
1125        }
1126    }
1127
1128    pub(crate) fn handle_flow(&mut self, flow: &Flow, link: Option<&SenderLink>) {
1129        // # AMQP1.0 2.5.6
1130        self.next_incoming_id = flow.next_outgoing_id();
1131        self.remote_outgoing_window = flow.outgoing_window();
1132
1133        self.remote_incoming_window = flow
1134            .next_incoming_id()
1135            .unwrap_or(0)
1136            .wrapping_add(flow.incoming_window())
1137            .wrapping_sub(self.next_outgoing_id);
1138
1139        log::trace!(
1140            "{}: Session received credit {:?}. window: {}, pending: {}",
1141            self.tag(),
1142            flow.link_credit(),
1143            self.remote_incoming_window,
1144            self.pending_transfers.len(),
1145        );
1146
1147        while let Some(tr) = self.pending_transfers.pop_front() {
1148            let _ = tr.tx.send(Ok(()));
1149        }
1150
1151        if flow.echo() {
1152            let flow = Flow(Box::new(codec::FlowInner {
1153                next_incoming_id: if self.flags.contains(Flags::LOCAL) {
1154                    Some(self.next_incoming_id)
1155                } else {
1156                    None
1157                },
1158                incoming_window: u32::MAX,
1159                next_outgoing_id: self.next_outgoing_id,
1160                outgoing_window: self.remote_incoming_window,
1161                handle: None,
1162                delivery_count: None,
1163                link_credit: None,
1164                available: None,
1165                drain: false,
1166                echo: false,
1167                properties: None,
1168            }));
1169            self.post_frame(flow.into());
1170        }
1171
1172        // apply link flow
1173        if let Some(link) = link {
1174            link.inner.get_mut().apply_flow(flow);
1175        }
1176    }
1177
1178    pub(crate) fn rcv_link_flow(&mut self, handle: u32, delivery_count: u32, credit: u32) {
1179        let flow = Flow(Box::new(codec::FlowInner {
1180            next_incoming_id: Some(self.next_incoming_id),
1181            incoming_window: u32::MAX,
1182            next_outgoing_id: self.next_outgoing_id,
1183            outgoing_window: self.remote_incoming_window,
1184            handle: Some(handle),
1185            delivery_count: Some(delivery_count),
1186            link_credit: Some(credit),
1187            available: None,
1188            drain: false,
1189            echo: false,
1190            properties: None,
1191        }));
1192        self.post_frame(flow.into());
1193    }
1194
1195    #[allow(clippy::too_many_lines)]
1196    pub(crate) async fn send_transfer(
1197        &mut self,
1198        link_handle: Handle,
1199        tag: Bytes,
1200        body: TransferBody,
1201        settled: bool,
1202        format: Option<MessageFormat>,
1203    ) -> Result<DeliveryNumber, AmqpProtocolError> {
1204        loop {
1205            if self.remote_incoming_window == 0 {
1206                log::trace!(
1207                    "{}: Remote window is 0, push to pending queue, hnd:{link_handle:?}",
1208                    self.sink.tag()
1209                );
1210                let (tx, rx) = self.pool_credit.channel();
1211                self.pending_transfers
1212                    .push_back(PendingTransfer { tx, link_handle });
1213
1214                rx.await
1215                    .map_err(|_| AmqpProtocolError::ConnectionDropped)
1216                    .and_then(|v| v)?;
1217                continue;
1218            }
1219            break;
1220        }
1221
1222        self.remote_incoming_window -= 1;
1223
1224        let delivery_id = self.next_outgoing_id;
1225        self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1);
1226
1227        let tr_settled = if settled {
1228            Some(DeliveryState::Accepted(Accepted {}))
1229        } else {
1230            None
1231        };
1232        let message_format = if format.is_none() {
1233            body.message_format()
1234        } else {
1235            format
1236        };
1237
1238        let max_frame_size = self.max_frame_size();
1239        let max_frame_size = if max_frame_size > 2048 {
1240            max_frame_size - 2048
1241        } else if max_frame_size == 0 {
1242            u32::MAX
1243        } else {
1244            max_frame_size
1245        } as usize;
1246
1247        // body is larger than allowed frame size, send body as a set of transfers
1248        if body.len() > max_frame_size {
1249            let mut body = match body {
1250                TransferBody::Data(data) => data,
1251                TransferBody::Message(msg) => {
1252                    let mut buf = self
1253                        .sink
1254                        .config()
1255                        .write_buf()
1256                        .buf_with_capacity(msg.encoded_size());
1257                    msg.encode(&mut buf);
1258                    buf.freeze()
1259                }
1260            };
1261
1262            let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1263
1264            let mut transfer = Transfer(Box::default());
1265            transfer.0.handle = link_handle;
1266            transfer.0.body = Some(TransferBody::Data(chunk));
1267            transfer.0.more = true;
1268            transfer.0.state = tr_settled;
1269            transfer.0.batchable = true;
1270            transfer.0.delivery_id = Some(delivery_id);
1271            transfer.0.delivery_tag = Some(tag.clone());
1272            transfer.0.message_format = message_format;
1273
1274            if settled {
1275                transfer.0.settled = Some(true);
1276            } else {
1277                self.unsettled_snd_deliveries
1278                    .insert(delivery_id, DeliveryInner::new(link_handle));
1279            }
1280
1281            log::trace!(
1282                "{}: Sending transfer over handle {link_handle}. window: {} delivery_id: {:?} delivery_tag: {:?}, more: {:?}, batchable: {:?}, settled: {:?}",
1283                self.sink.tag(),
1284                self.remote_incoming_window,
1285                transfer.delivery_id(),
1286                transfer.delivery_tag(),
1287                transfer.more(),
1288                transfer.batchable(),
1289                transfer.settled(),
1290            );
1291            self.post_frame(Frame::Transfer(transfer));
1292
1293            loop {
1294                // last chunk
1295                if body.is_empty() {
1296                    log::trace!("{}: Last tranfer for {tag:?} is sent", self.tag());
1297                    break;
1298                }
1299
1300                let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1301
1302                log::trace!("{}: Sending chunk tranfer for {tag:?}", self.tag());
1303                let mut transfer = Transfer(Box::default());
1304                transfer.0.handle = link_handle;
1305                transfer.0.body = Some(TransferBody::Data(chunk));
1306                transfer.0.more = !body.is_empty();
1307                transfer.0.batchable = true;
1308                transfer.0.message_format = message_format;
1309                self.post_frame(Frame::Transfer(transfer));
1310            }
1311        } else {
1312            let mut transfer = Transfer(Box::default());
1313            transfer.0.handle = link_handle;
1314            transfer.0.body = Some(body);
1315            transfer.0.state = tr_settled;
1316            transfer.0.delivery_id = Some(delivery_id);
1317            transfer.0.delivery_tag = Some(tag);
1318            transfer.0.message_format = message_format;
1319
1320            if settled {
1321                transfer.0.settled = Some(true);
1322            } else {
1323                self.unsettled_snd_deliveries
1324                    .insert(delivery_id, DeliveryInner::new(link_handle));
1325            }
1326            self.post_frame(Frame::Transfer(transfer));
1327        }
1328
1329        Ok(delivery_id)
1330    }
1331
1332    pub(crate) fn post_frame(&mut self, frame: Frame) {
1333        self.sink.post_frame(AmqpFrame::new(self.id(), frame));
1334    }
1335}