ntex_amqp/
session.rs

1use std::{cmp, collections::VecDeque, fmt, future::Future, mem};
2
3use ntex::channel::{condition, oneshot, pool};
4use ntex::util::{ByteString, Bytes, Either, HashMap, PoolRef, Ready};
5use slab::Slab;
6
7use ntex_amqp_codec::protocol::{
8    self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End,
9    Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source,
10    Transfer, TransferBody, TransferNumber,
11};
12use ntex_amqp_codec::{AmqpFrame, Encode};
13
14use crate::delivery::DeliveryInner;
15use crate::error::AmqpProtocolError;
16use crate::rcvlink::{
17    EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner,
18};
19use crate::sndlink::{EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner};
20use crate::{cell::Cell, types::Action, ConnectionRef, ControlFrame};
21
22pub(crate) const INITIAL_NEXT_OUTGOING_ID: TransferNumber = 1;
23
24#[derive(Clone)]
25pub struct Session {
26    pub(crate) inner: Cell<SessionInner>,
27}
28
29#[derive(Debug)]
30pub(crate) struct SessionInner {
31    id: usize,
32    sink: ConnectionRef,
33    next_outgoing_id: TransferNumber,
34    flags: Flags,
35
36    remote_channel_id: u16,
37    next_incoming_id: TransferNumber,
38    remote_outgoing_window: u32,
39    remote_incoming_window: u32,
40
41    links: Slab<Either<SenderLinkState, ReceiverLinkState>>,
42    links_by_name: HashMap<ByteString, usize>,
43    remote_handles: HashMap<Handle, usize>,
44    error: Option<AmqpProtocolError>,
45    closed: condition::Condition,
46
47    pending_transfers: VecDeque<PendingTransfer>,
48    pub(crate) unsettled_snd_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
49    pub(crate) unsettled_rcv_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
50
51    pub(crate) pool_notify: pool::Pool<()>,
52    pub(crate) pool_credit: pool::Pool<Result<(), AmqpProtocolError>>,
53}
54
55impl fmt::Debug for Session {
56    fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
57        fmt.debug_struct("Session").finish()
58    }
59}
60
61impl Session {
62    pub(crate) fn new(inner: Cell<SessionInner>) -> Session {
63        Session { inner }
64    }
65
66    #[inline]
67    /// Get io tag for current connection
68    pub fn tag(&self) -> &'static str {
69        self.inner.get_ref().sink.tag()
70    }
71
72    #[inline]
73    pub fn connection(&self) -> &ConnectionRef {
74        &self.inner.get_ref().sink
75    }
76
77    #[inline]
78    pub fn local_channel_id(&self) -> u16 {
79        self.inner.get_ref().id()
80    }
81
82    #[inline]
83    pub fn remote_channel_id(&self) -> u16 {
84        self.inner.get_ref().remote_channel_id
85    }
86
87    #[inline]
88    /// Get remote window size
89    pub fn remote_window_size(&self) -> u32 {
90        self.inner.get_ref().remote_incoming_window
91    }
92
93    pub fn end(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
94        let inner = self.inner.get_mut();
95
96        if inner.flags.contains(Flags::ENDED) {
97            Either::Left(Ready::Ok(()))
98        } else {
99            if !inner.flags.contains(Flags::ENDING) {
100                inner.sink.close_session(inner.remote_channel_id as usize);
101                inner.post_frame(Frame::End(End { error: None }));
102                inner.flags.insert(Flags::ENDING);
103                inner
104                    .sink
105                    .get_control_queue()
106                    .enqueue_frame(ControlFrame::new_kind(
107                        crate::ControlFrameKind::LocalSessionEnded(inner.get_all_links()),
108                    ));
109            }
110            let inner = self.inner.clone();
111            Either::Right(async move {
112                inner.closed.wait().await;
113                if let Some(err @ AmqpProtocolError::SessionEnded(Some(_))) = inner.error.clone() {
114                    Err(err)
115                } else {
116                    Ok(())
117                }
118            })
119        }
120    }
121
122    pub fn get_sender_link(&self, name: &str) -> Option<&SenderLink> {
123        let inner = self.inner.get_ref();
124
125        if let Some(id) = inner.links_by_name.get(name) {
126            if let Some(Either::Left(SenderLinkState::Established(ref link))) = inner.links.get(*id)
127            {
128                return Some(link);
129            }
130        }
131        None
132    }
133
134    #[inline]
135    pub fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
136        self.inner.get_ref().get_sender_link_by_local_handle(hnd)
137    }
138
139    #[inline]
140    pub fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
141        self.inner.get_ref().get_sender_link_by_remote_handle(hnd)
142    }
143
144    #[inline]
145    pub fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
146        self.inner.get_ref().get_receiver_link_by_local_handle(hnd)
147    }
148
149    #[inline]
150    pub fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
151        self.inner.get_ref().get_receiver_link_by_remote_handle(hnd)
152    }
153
154    /// Open sender link
155    pub fn build_sender_link<T: Into<ByteString>, U: Into<ByteString>>(
156        &self,
157        name: U,
158        address: T,
159    ) -> SenderLinkBuilder {
160        let name = name.into();
161        let address = address.into();
162        SenderLinkBuilder::new(name, address, self.inner.clone())
163    }
164
165    /// Open receiver link
166    pub fn build_receiver_link<T: Into<ByteString>, U: Into<ByteString>>(
167        &self,
168        name: U,
169        address: T,
170    ) -> ReceiverLinkBuilder {
171        let name = name.into();
172        let address = address.into();
173        ReceiverLinkBuilder::new(name, address, self.inner.clone())
174    }
175
176    /// Detach receiver link
177    pub fn detach_receiver_link(
178        &self,
179        handle: Handle,
180        error: Option<Error>,
181    ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
182        let (tx, rx) = oneshot::channel();
183
184        self.inner
185            .get_mut()
186            .detach_receiver_link(handle, false, error, tx);
187
188        async move {
189            match rx.await {
190                Ok(Ok(_)) => Ok(()),
191                Ok(Err(e)) => {
192                    log::trace!("Cannot complete detach receiver link {e:?}");
193                    Err(e)
194                }
195                Err(_) => {
196                    log::trace!("Cannot complete detach receiver link, connection is gone");
197                    Err(AmqpProtocolError::Disconnected)
198                }
199            }
200        }
201    }
202
203    /// Detach sender link
204    pub fn detach_sender_link(
205        &self,
206        handle: Handle,
207        error: Option<Error>,
208    ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
209        let (tx, rx) = oneshot::channel();
210
211        self.inner
212            .get_mut()
213            .detach_sender_link(handle, false, error, tx);
214
215        async move {
216            match rx.await {
217                Ok(Ok(_)) => Ok(()),
218                Ok(Err(e)) => {
219                    log::trace!("Cannot complete detach sender link {e:?}");
220                    Err(e)
221                }
222                Err(_) => {
223                    log::trace!("Cannot complete detach sender link, connection is gone");
224                    Err(AmqpProtocolError::Disconnected)
225                }
226            }
227        }
228    }
229}
230
231#[derive(Debug)]
232enum SenderLinkState {
233    Established(EstablishedSenderLink),
234    OpeningRemote,
235    Opening(Option<oneshot::Sender<Result<Cell<SenderLinkInner>, AmqpProtocolError>>>),
236    Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
237}
238
239#[derive(Debug)]
240enum ReceiverLinkState {
241    Opening(Box<Option<(Cell<ReceiverLinkInner>, Option<Source>)>>),
242    OpeningLocal(
243        Option<(
244            Cell<ReceiverLinkInner>,
245            oneshot::Sender<Result<ReceiverLink, AmqpProtocolError>>,
246        )>,
247    ),
248    Established(EstablishedReceiverLink),
249    Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
250}
251
252impl SenderLinkState {
253    fn is_opening(&self) -> bool {
254        matches!(self, SenderLinkState::Opening(_))
255    }
256}
257
258impl ReceiverLinkState {
259    fn is_opening(&self) -> bool {
260        matches!(self, ReceiverLinkState::OpeningLocal(_))
261    }
262}
263
264bitflags::bitflags! {
265    #[derive(Copy, Clone, Debug)]
266    struct Flags: u8 {
267        const LOCAL =  0b0000_0001;
268        const ENDED =  0b0000_0010;
269        const ENDING = 0b0000_0100;
270    }
271}
272
273#[derive(Debug)]
274struct PendingTransfer {
275    tx: pool::Sender<Result<(), AmqpProtocolError>>,
276    link_handle: Handle,
277}
278
279impl SessionInner {
280    pub(crate) fn new(
281        id: usize,
282        local: bool,
283        sink: ConnectionRef,
284        remote_channel_id: u16,
285        next_incoming_id: DeliveryNumber,
286        remote_incoming_window: u32,
287        remote_outgoing_window: u32,
288    ) -> SessionInner {
289        SessionInner {
290            id,
291            sink,
292            next_incoming_id,
293            remote_channel_id,
294            remote_incoming_window,
295            remote_outgoing_window,
296            flags: if local { Flags::LOCAL } else { Flags::empty() },
297            next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
298            unsettled_snd_deliveries: HashMap::default(),
299            unsettled_rcv_deliveries: HashMap::default(),
300            links: Slab::new(),
301            links_by_name: HashMap::default(),
302            remote_handles: HashMap::default(),
303            pending_transfers: VecDeque::new(),
304            error: None,
305            pool_notify: pool::new(),
306            pool_credit: pool::new(),
307            closed: condition::Condition::new(),
308        }
309    }
310
311    /// Local channel id
312    pub(crate) fn id(&self) -> u16 {
313        self.id as u16
314    }
315
316    pub(crate) fn tag(&self) -> &'static str {
317        self.sink.tag()
318    }
319
320    pub(crate) fn memory_pool(&self) -> PoolRef {
321        self.sink.0.memory_pool()
322    }
323
324    pub(crate) fn unsettled_deliveries(
325        &mut self,
326        sender: bool,
327    ) -> &mut HashMap<DeliveryNumber, DeliveryInner> {
328        if sender {
329            &mut self.unsettled_snd_deliveries
330        } else {
331            &mut self.unsettled_rcv_deliveries
332        }
333    }
334
335    /// Set error. New operations will return error.
336    pub(crate) fn set_error(&mut self, err: AmqpProtocolError) {
337        log::trace!(
338            "{}: Connection is failed, dropping state: {err:?}",
339            self.tag()
340        );
341
342        // drop pending transfers
343        for tr in self.pending_transfers.drain(..) {
344            let _ = tr.tx.send(Err(err.clone()));
345        }
346
347        // drop unsettled deliveries
348        for (_, mut promise) in self.unsettled_snd_deliveries.drain() {
349            promise.set_error(err.clone());
350        }
351        for (_, mut promise) in self.unsettled_rcv_deliveries.drain() {
352            promise.set_error(err.clone());
353        }
354
355        // drop links
356        self.links_by_name.clear();
357        for (_, st) in self.links.iter_mut() {
358            match st {
359                Either::Left(SenderLinkState::Opening(_)) => (),
360                Either::Left(SenderLinkState::Established(ref mut link)) => {
361                    link.inner.get_mut().remote_detached(err.clone());
362                }
363                Either::Left(SenderLinkState::Closing(ref mut link)) => {
364                    if let Some(tx) = link.take() {
365                        let _ = tx.send(Err(err.clone()));
366                    }
367                }
368                Either::Right(ReceiverLinkState::Established(ref mut link)) => {
369                    link.remote_detached(None);
370                }
371                _ => (),
372            }
373        }
374        self.links.clear();
375
376        self.error = Some(err);
377        self.flags.insert(Flags::ENDED);
378        self.closed.notify_and_lock_readiness();
379    }
380
381    /// End session.
382    pub(crate) fn end(&mut self, err: AmqpProtocolError) -> Action {
383        log::trace!("{}: Session is ended: {err:?}", self.tag());
384
385        let links = self.get_all_links();
386        self.set_error(err);
387
388        Action::SessionEnded(links)
389    }
390
391    fn get_all_links(&self) -> Vec<Either<SenderLink, ReceiverLink>> {
392        self.links
393            .iter()
394            .filter_map(|(_, st)| match st {
395                Either::Left(SenderLinkState::Established(ref link)) => {
396                    Some(Either::Left((*link).clone()))
397                }
398                Either::Right(ReceiverLinkState::Established(ref link)) => {
399                    Some(Either::Right((*link).clone()))
400                }
401                _ => None,
402            })
403            .collect()
404    }
405
406    pub(crate) fn max_frame_size(&self) -> u32 {
407        self.sink.0.max_frame_size
408    }
409
410    /// Initialize creation of remote sender link
411    pub(crate) fn new_remote_sender(&mut self, attach: &Attach) -> (usize, Attach) {
412        let id = self
413            .links
414            .insert(Either::Left(SenderLinkState::OpeningRemote));
415
416        let attach = Attach(Box::new(codec::AttachInner {
417            name: attach.0.name.clone(),
418            handle: id as Handle,
419            role: Role::Sender,
420            snd_settle_mode: attach.snd_settle_mode(),
421            rcv_settle_mode: attach.rcv_settle_mode(),
422            source: attach.0.source.clone(),
423            target: attach.0.target.clone(),
424            unsettled: None,
425            incomplete_unsettled: false,
426            initial_delivery_count: Some(attach.initial_delivery_count().unwrap_or(0)),
427            max_message_size: None,
428            offered_capabilities: None,
429            desired_capabilities: None,
430            properties: None,
431        }));
432
433        (id, attach)
434    }
435
436    /// Open sender link
437    pub(crate) fn attach_local_sender_link(
438        &mut self,
439        mut frame: Attach,
440    ) -> oneshot::Receiver<Result<Cell<SenderLinkInner>, AmqpProtocolError>> {
441        let (tx, rx) = oneshot::channel();
442
443        let entry = self.links.vacant_entry();
444        let token = entry.key();
445        entry.insert(Either::Left(SenderLinkState::Opening(Some(tx))));
446        log::trace!(
447            "{}: Local sender link opening: {:?} hnd:{token:?}",
448            self.tag(),
449            frame.name(),
450        );
451
452        frame.0.handle = token as Handle;
453
454        self.links_by_name.insert(frame.0.name.clone(), token);
455        self.post_frame(Frame::Attach(frame));
456        rx
457    }
458
459    /// Register remote sender link
460    pub(crate) fn attach_remote_sender_link(
461        &mut self,
462        attach: &Attach,
463        mut response: Attach,
464        link: Cell<SenderLinkInner>,
465    ) -> SenderLink {
466        log::trace!(
467            "{}: Remote sender link attached: {:?}",
468            self.tag(),
469            attach.name()
470        );
471        let token = link.id;
472
473        if let Some(source) = attach.source() {
474            if let Some(ref addr) = source.address {
475                self.links_by_name.insert(addr.clone(), token);
476            }
477        }
478
479        self.remote_handles.insert(attach.handle(), token);
480        *self
481            .links
482            .get_mut(token)
483            .expect("new remote sender entry must exist") = Either::Left(
484            SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
485        );
486
487        *response.handle_mut() = token as Handle;
488        *response.max_message_size_mut() = link.max_message_size().map(|v| v as u64);
489
490        self.post_frame(response.into());
491        SenderLink::new(link)
492    }
493
494    /// Detach sender link
495    pub(crate) fn detach_sender_link(
496        &mut self,
497        id: Handle,
498        closed: bool,
499        error: Option<Error>,
500        tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
501    ) {
502        if let Some(Either::Left(link)) = self.links.get_mut(id as usize) {
503            match link {
504                SenderLinkState::Opening(_) => {
505                    let detach = Detach(Box::new(codec::DetachInner {
506                        handle: id,
507                        closed,
508                        error,
509                    }));
510                    *link = SenderLinkState::Closing(Some(tx));
511                    self.post_frame(detach.into());
512                }
513                SenderLinkState::Established(sender_link) => {
514                    let sender_link = sender_link.clone();
515                    let detach = Detach(Box::new(codec::DetachInner {
516                        handle: id,
517                        closed,
518                        error,
519                    }));
520
521                    *link = SenderLinkState::Closing(Some(tx));
522                    self.post_frame(detach.clone().into());
523                    self.sink
524                        .get_control_queue()
525                        .enqueue_frame(ControlFrame::new_kind(
526                            crate::ControlFrameKind::LocalDetachSender(detach, sender_link),
527                        ))
528                }
529                SenderLinkState::OpeningRemote => {
530                    let _ = tx.send(Ok(()));
531                    log::error!(
532                        "{}: Unexpected sender link state: opening remote - {id}",
533                        self.tag()
534                    );
535                }
536                SenderLinkState::Closing(_) => {
537                    let _ = tx.send(Ok(()));
538                    log::error!(
539                        "{}: Unexpected sender link state: closing - {id}",
540                        self.tag()
541                    );
542                }
543            }
544        } else {
545            let _ = tx.send(Ok(()));
546            log::debug!(
547                "{}: Sender link does not exist while detaching: {id}",
548                self.tag()
549            );
550        }
551    }
552
553    /// Detach unconfirmed sender link
554    pub(crate) fn detach_unconfirmed_sender_link(
555        &mut self,
556        attach: &Attach,
557        link: Cell<SenderLinkInner>,
558        error: Option<Error>,
559    ) {
560        let token = link.id;
561
562        let attach = Attach(Box::new(codec::AttachInner {
563            name: attach.0.name.clone(),
564            handle: token as Handle,
565            role: attach.0.role,
566            snd_settle_mode: SenderSettleMode::Unsettled,
567            rcv_settle_mode: ReceiverSettleMode::First,
568            source: None,
569            target: None,
570            unsettled: None,
571            incomplete_unsettled: false,
572            initial_delivery_count: None,
573            max_message_size: None,
574            offered_capabilities: None,
575            desired_capabilities: None,
576            properties: None,
577        }));
578        self.post_frame(attach.into());
579
580        let detach = Detach(Box::new(codec::DetachInner {
581            handle: token as Handle,
582            closed: true,
583            error,
584        }));
585        self.post_frame(detach.into());
586
587        if self.links.contains(token) {
588            self.links.remove(token);
589        }
590    }
591
592    pub(crate) fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
593        if let Some(Either::Left(SenderLinkState::Established(ref link))) =
594            self.links.get(hnd as usize)
595        {
596            Some(link)
597        } else {
598            None
599        }
600    }
601
602    pub(crate) fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
603        if let Some(id) = self.remote_handles.get(&hnd) {
604            if let Some(Either::Left(SenderLinkState::Established(ref link))) = self.links.get(*id)
605            {
606                return Some(link);
607            }
608        }
609        None
610    }
611
612    /// Register receiver link
613    pub(crate) fn attach_remote_receiver_link(
614        &mut self,
615        cell: Cell<SessionInner>,
616        attach: &Attach,
617    ) -> (Attach, ReceiverLink) {
618        let handle = attach.handle();
619        let entry = self.links.vacant_entry();
620        let token = entry.key();
621
622        let inner = Cell::new(ReceiverLinkInner::new(cell, token as u32, handle, attach));
623        entry.insert(Either::Right(ReceiverLinkState::Opening(Box::new(Some((
624            inner.clone(),
625            attach.source().cloned(),
626        ))))));
627        self.remote_handles.insert(handle, token);
628
629        let response = Attach(Box::new(codec::AttachInner {
630            name: attach.0.name.clone(),
631            handle: token as Handle,
632            role: Role::Receiver,
633            snd_settle_mode: attach.snd_settle_mode(),
634            rcv_settle_mode: ReceiverSettleMode::First,
635            source: attach.0.source.clone(),
636            target: attach.0.target.clone(),
637            unsettled: None,
638            incomplete_unsettled: false,
639            initial_delivery_count: Some(0),
640            offered_capabilities: None,
641            desired_capabilities: None,
642            max_message_size: None,
643            properties: None,
644        }));
645
646        (response, ReceiverLink::new(inner))
647    }
648
649    pub(crate) fn attach_local_receiver_link(
650        &mut self,
651        cell: Cell<SessionInner>,
652        mut frame: Attach,
653    ) -> oneshot::Receiver<Result<ReceiverLink, AmqpProtocolError>> {
654        let (tx, rx) = oneshot::channel();
655
656        let entry = self.links.vacant_entry();
657        let token = entry.key();
658
659        let inner = Cell::new(ReceiverLinkInner::new(
660            cell,
661            token as u32,
662            token as u32,
663            &frame,
664        ));
665        entry.insert(Either::Right(ReceiverLinkState::OpeningLocal(Some((
666            inner, tx,
667        )))));
668
669        frame.0.handle = token as Handle;
670
671        self.links_by_name.insert(frame.0.name.clone(), token);
672        self.post_frame(Frame::Attach(frame));
673        rx
674    }
675
676    pub(crate) fn confirm_receiver_link(
677        &mut self,
678        token: Handle,
679        mut response: Attach,
680        max_message_size: Option<u64>,
681    ) {
682        if let Some(Either::Right(link)) = self.links.get_mut(token as usize) {
683            if let ReceiverLinkState::Opening(l) = link {
684                if let Some((l, _)) = l.take() {
685                    *response.max_message_size_mut() = max_message_size;
686                    *link = ReceiverLinkState::Established(EstablishedReceiverLink::new(l));
687                    self.post_frame(response.into());
688                    return;
689                }
690            }
691        }
692        // TODO: close session
693        log::error!("{}: Unexpected receiver link state", self.tag());
694    }
695
696    /// Detach receiver link
697    pub(crate) fn detach_receiver_link(
698        &mut self,
699        id: Handle,
700        closed: bool,
701        error: Option<Error>,
702        tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
703    ) {
704        if let Some(Either::Right(link)) = self.links.get_mut(id as usize) {
705            match link {
706                ReceiverLinkState::Opening(inner) => {
707                    if let Some((inner, source)) = inner.take() {
708                        let attach = Attach(Box::new(codec::AttachInner {
709                            source,
710                            max_message_size: None,
711                            name: inner.name().clone(),
712                            handle: id,
713                            role: Role::Receiver,
714                            snd_settle_mode: SenderSettleMode::Mixed,
715                            rcv_settle_mode: ReceiverSettleMode::First,
716                            target: None,
717                            unsettled: None,
718                            incomplete_unsettled: false,
719                            initial_delivery_count: Some(0),
720                            offered_capabilities: None,
721                            desired_capabilities: None,
722                            properties: None,
723                        }));
724                        self.post_frame(attach.into());
725                    }
726                    let detach = Detach(Box::new(codec::DetachInner {
727                        closed,
728                        error,
729                        handle: id,
730                    }));
731                    self.post_frame(detach.into());
732                    let _ = tx.send(Ok(()));
733                    if self.links.contains(id as usize) {
734                        let _ = self.links.remove(id as usize);
735                    }
736                }
737                ReceiverLinkState::Established(receiver_link) => {
738                    let receiver_link = receiver_link.clone();
739                    let detach = Detach(Box::new(codec::DetachInner {
740                        handle: id,
741                        closed,
742                        error,
743                    }));
744                    *link = ReceiverLinkState::Closing(Some(tx));
745                    self.post_frame(detach.clone().into());
746                    self.sink
747                        .get_control_queue()
748                        .enqueue_frame(ControlFrame::new_kind(
749                            crate::ControlFrameKind::LocalDetachReceiver(detach, receiver_link),
750                        ))
751                }
752                ReceiverLinkState::Closing(_) => {
753                    let _ = tx.send(Ok(()));
754                    if self.links.contains(id as usize) {
755                        let _ = self.links.remove(id as usize);
756                    }
757                    log::error!(
758                        "{}: Unexpected receiver link state: closing - {id}",
759                        self.tag()
760                    );
761                }
762                ReceiverLinkState::OpeningLocal(_inner) => unimplemented!(),
763            }
764        } else {
765            let _ = tx.send(Ok(()));
766            log::error!(
767                "{}: Receiver link does not exist while detaching: {id}",
768                self.tag()
769            );
770        }
771    }
772
773    pub(crate) fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
774        if let Some(Either::Right(ReceiverLinkState::Established(ref link))) =
775            self.links.get(hnd as usize)
776        {
777            Some(link)
778        } else {
779            None
780        }
781    }
782
783    pub(crate) fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
784        if let Some(id) = self.remote_handles.get(&hnd) {
785            if let Some(Either::Right(ReceiverLinkState::Established(ref link))) =
786                self.links.get(*id)
787            {
788                return Some(link);
789            }
790        }
791        None
792    }
793
794    pub(crate) fn handle_frame(&mut self, frame: Frame) -> Result<Action, AmqpProtocolError> {
795        if self.error.is_none() {
796            match frame {
797                Frame::Flow(flow) => {
798                    // apply link flow
799                    if let Some(Either::Left(link)) = flow
800                        .handle()
801                        .and_then(|h| self.remote_handles.get(&h).copied())
802                        .and_then(|h| self.links.get_mut(h))
803                    {
804                        if let SenderLinkState::Established(ref link) = link {
805                            return Ok(Action::Flow((*link).clone(), flow));
806                        }
807                        log::warn!("{}: Received flow frame", self.tag());
808                    }
809                    self.handle_flow(&flow, None);
810                    Ok(Action::None)
811                }
812                Frame::Disposition(disp) => {
813                    self.settle_deliveries(disp);
814                    Ok(Action::None)
815                }
816                Frame::Transfer(transfer) => {
817                    let idx = if let Some(idx) = self.remote_handles.get(&transfer.handle()) {
818                        *idx
819                    } else {
820                        log::debug!(
821                            "{}: Transfer's link {:?} is unknown",
822                            self.tag(),
823                            transfer.handle()
824                        );
825                        return Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)));
826                    };
827
828                    if let Some(link) = self.links.get_mut(idx) {
829                        match link {
830                            Either::Left(_) => {
831                                log::debug!(
832                                    "{}: Got unexpected trasfer from sender link",
833                                    self.tag()
834                                );
835                                Err(AmqpProtocolError::Unexpected(Frame::Transfer(transfer)))
836                            }
837                            Either::Right(link) => match link {
838                                ReceiverLinkState::Opening(_) => {
839                                    log::debug!(
840                                        "{}: Got transfer for opening link: {} -> {idx}",
841                                        self.tag(),
842                                        transfer.handle()
843                                    );
844                                    Err(AmqpProtocolError::UnexpectedOpeningState(Frame::Transfer(
845                                        transfer,
846                                    )))
847                                }
848                                ReceiverLinkState::OpeningLocal(_) => {
849                                    log::debug!(
850                                        "{}: Got transfer for opening link: {} -> {idx}",
851                                        self.tag(),
852                                        transfer.handle()
853                                    );
854                                    Err(AmqpProtocolError::UnexpectedOpeningState(Frame::Transfer(
855                                        transfer,
856                                    )))
857                                }
858                                ReceiverLinkState::Established(link) => {
859                                    // self.outgoing_window -= 1;
860                                    let _ = self.next_incoming_id.wrapping_add(1);
861                                    link.inner.get_mut().handle_transfer(transfer, &link.inner)
862                                }
863                                ReceiverLinkState::Closing(_) => Ok(Action::None),
864                            },
865                        }
866                    } else {
867                        Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)))
868                    }
869                }
870                Frame::Detach(detach) => Ok(self.handle_detach(detach)),
871                frame => {
872                    log::debug!("{}: Unexpected frame: {frame:?}", self.tag());
873                    Ok(Action::None)
874                }
875            }
876        } else {
877            Ok(Action::None)
878        }
879    }
880
881    /// Handle `Attach` frame. return false if attach frame is remote and can not be handled
882    pub(crate) fn handle_attach(&mut self, attach: &Attach, cell: Cell<SessionInner>) -> bool {
883        let name = attach.name();
884
885        if let Some(index) = self.links_by_name.get(name) {
886            match self.links.get_mut(*index) {
887                Some(Either::Left(item)) => {
888                    if item.is_opening() {
889                        log::trace!(
890                            "{}: Local sender link attached: {name:?} {index} -> {}, {:?}",
891                            self.sink.tag(),
892                            attach.handle(),
893                            self.remote_handles.contains_key(&attach.handle())
894                        );
895
896                        self.remote_handles.insert(attach.handle(), *index);
897                        let delivery_count = attach.initial_delivery_count().unwrap_or(0);
898                        let link = Cell::new(SenderLinkInner::new(
899                            *index,
900                            name.clone(),
901                            attach.handle(),
902                            delivery_count,
903                            cell,
904                            attach
905                                .max_message_size()
906                                .map(|v| u32::try_from(v).unwrap_or(u32::MAX)),
907                        ));
908                        let local_sender = mem::replace(
909                            item,
910                            SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
911                        );
912
913                        if let SenderLinkState::Opening(Some(tx)) = local_sender {
914                            let _ = tx.send(Ok(link));
915                        }
916                    }
917                }
918                Some(Either::Right(item)) => {
919                    if item.is_opening() {
920                        log::trace!(
921                            "{}: Local receiver link attached: {name:?} {index} -> {}",
922                            self.sink.tag(),
923                            attach.handle()
924                        );
925                        if let ReceiverLinkState::OpeningLocal(opt_item) = item {
926                            if let Some((link, tx)) = opt_item.take() {
927                                self.remote_handles.insert(attach.handle(), *index);
928
929                                *item = ReceiverLinkState::Established(
930                                    EstablishedReceiverLink::new(link.clone()),
931                                );
932                                let _ = tx.send(Ok(ReceiverLink::new(link)));
933                            } else {
934                                // TODO: close session
935                                log::error!("{}: Inconsistent session state, bug", self.tag());
936                            }
937                        }
938                    }
939                }
940                _ => {
941                    // TODO: error in proto, have to close connection
942                }
943            }
944            true
945        } else {
946            // cannot handle remote attach
947            false
948        }
949    }
950
951    /// Handle `Detach` frame.
952    pub(crate) fn handle_detach(&mut self, mut frame: Detach) -> Action {
953        // get local link instance
954        let idx = if let Some(idx) = self.remote_handles.get(&frame.handle()) {
955            *idx
956        } else if self.links.contains(frame.handle() as usize) {
957            frame.handle() as usize
958        } else {
959            // should not happen, error
960            log::info!("{}: Detaching unknown link: {frame:?}", self.tag());
961            return Action::None;
962        };
963
964        let handle = frame.handle();
965        let mut action = Action::None;
966
967        let remove = if let Some(link) = self.links.get_mut(idx) {
968            match link {
969                Either::Left(link) => match link {
970                    SenderLinkState::Opening(ref mut tx) => {
971                        if let Some(tx) = tx.take() {
972                            let err = AmqpProtocolError::LinkDetached(frame.0.error.clone());
973                            let _ = tx.send(Err(err));
974                        }
975                        true
976                    }
977                    SenderLinkState::Established(link) => {
978                        // detach from remote endpoint
979                        let detach = Detach(Box::new(codec::DetachInner {
980                            handle: link.inner.get_ref().id(),
981                            closed: true,
982                            error: frame.error().cloned(),
983                        }));
984                        let err = AmqpProtocolError::LinkDetached(detach.0.error.clone());
985
986                        // remove name
987                        self.links_by_name.remove(link.inner.name());
988
989                        // drop pending transfers
990                        let mut idx = 0;
991                        let handle = link.inner.get_ref().remote_handle();
992                        while idx < self.pending_transfers.len() {
993                            if self.pending_transfers[idx].link_handle == handle {
994                                let tr = self.pending_transfers.remove(idx).unwrap();
995                                let _ = tr.tx.send(Err(err.clone()));
996                            } else {
997                                idx += 1;
998                            }
999                        }
1000
1001                        // drop unsettled transfers
1002                        let handle = link.inner.get_ref().id() as Handle;
1003                        for delivery in self.unsettled_snd_deliveries.values_mut() {
1004                            if delivery.handle() == handle {
1005                                delivery.set_error(err.clone());
1006                            }
1007                        }
1008
1009                        // detach snd link
1010                        link.inner.get_mut().remote_detached(err);
1011                        self.sink
1012                            .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1013                        action = Action::DetachSender(link.clone(), frame);
1014                        true
1015                    }
1016                    SenderLinkState::OpeningRemote => {
1017                        log::warn!(
1018                            "{}: Detach frame received for unconfirmed sender link: {frame:?}",
1019                            self.tag()
1020                        );
1021                        true
1022                    }
1023                    SenderLinkState::Closing(tx) => {
1024                        if let Some(tx) = tx.take() {
1025                            if let Some(err) = frame.0.error {
1026                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1027                            } else {
1028                                let _ = tx.send(Ok(()));
1029                            }
1030                        }
1031                        true
1032                    }
1033                },
1034                Either::Right(link) => match link {
1035                    ReceiverLinkState::Opening(_) => false,
1036                    ReceiverLinkState::OpeningLocal(ref mut item) => {
1037                        if let Some((inner, tx)) = item.take() {
1038                            inner.get_mut().detached();
1039                            if let Some(err) = frame.0.error.clone() {
1040                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1041                            } else {
1042                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(None)));
1043                            }
1044                        } else {
1045                            log::error!("{}: Inconsistent session state, bug", self.tag());
1046                        }
1047
1048                        true
1049                    }
1050                    ReceiverLinkState::Established(link) => {
1051                        let error = frame.0.error.take();
1052
1053                        // drop unsettled transfers
1054                        let err = AmqpProtocolError::LinkDetached(error.clone());
1055                        let handle = link.inner.get_ref().id();
1056                        for delivery in self.unsettled_rcv_deliveries.values_mut() {
1057                            if delivery.handle() == handle {
1058                                delivery.set_error(err.clone());
1059                            }
1060                        }
1061
1062                        // detach from remote endpoint
1063                        let detach = Detach(Box::new(codec::DetachInner {
1064                            handle: link.handle(),
1065                            closed: true,
1066                            error: None,
1067                        }));
1068                        self.sink
1069                            .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1070
1071                        // detach rcv link
1072                        link.remote_detached(error);
1073                        action = Action::DetachReceiver(link.clone(), frame);
1074                        true
1075                    }
1076                    ReceiverLinkState::Closing(tx) => {
1077                        // detach confirmation
1078                        if let Some(tx) = tx.take() {
1079                            if let Some(err) = frame.0.error {
1080                                let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1081                            } else {
1082                                let _ = tx.send(Ok(()));
1083                            }
1084                        }
1085                        true
1086                    }
1087                },
1088            }
1089        } else {
1090            false
1091        };
1092
1093        if remove {
1094            if self.links.contains(idx) {
1095                self.links.remove(idx);
1096            }
1097            self.remote_handles.remove(&handle);
1098        }
1099        action
1100    }
1101
1102    fn settle_deliveries(&mut self, disp: Disposition) {
1103        let from = disp.first();
1104        let to = disp.last();
1105
1106        if cfg!(feature = "frame-trace") {
1107            log::trace!("{}: Settle delivery: {disp:#?}", self.tag());
1108        } else {
1109            log::trace!(
1110                "{}: Settle delivery from {from} - {to:?}, state {:?} settled: {:?}",
1111                self.tag(),
1112                disp.state(),
1113                disp.settled()
1114            );
1115        }
1116
1117        let deliveries = if disp.role() == Role::Receiver {
1118            &mut self.unsettled_snd_deliveries
1119        } else {
1120            &mut self.unsettled_rcv_deliveries
1121        };
1122
1123        if let Some(to) = to {
1124            for no in from..=to {
1125                if let Some(delivery) = deliveries.get_mut(&no) {
1126                    delivery.handle_disposition(disp.clone());
1127                } else {
1128                    log::trace!(
1129                        "{}: Unknown deliveryid: {no:?} disp: {disp:?}",
1130                        self.sink.tag()
1131                    );
1132                }
1133            }
1134        } else if let Some(delivery) = deliveries.get_mut(&from) {
1135            delivery.handle_disposition(disp);
1136        }
1137    }
1138
1139    pub(crate) fn handle_flow(&mut self, flow: &Flow, link: Option<&SenderLink>) {
1140        // # AMQP1.0 2.5.6
1141        self.next_incoming_id = flow.next_outgoing_id();
1142        self.remote_outgoing_window = flow.outgoing_window();
1143
1144        self.remote_incoming_window = flow
1145            .next_incoming_id()
1146            .unwrap_or(0)
1147            .wrapping_add(flow.incoming_window())
1148            .wrapping_sub(self.next_outgoing_id);
1149
1150        log::trace!(
1151            "{}: Session received credit {:?}. window: {}, pending: {}",
1152            self.tag(),
1153            flow.link_credit(),
1154            self.remote_incoming_window,
1155            self.pending_transfers.len(),
1156        );
1157
1158        while let Some(tr) = self.pending_transfers.pop_front() {
1159            let _ = tr.tx.send(Ok(()));
1160        }
1161
1162        if flow.echo() {
1163            let flow = Flow(Box::new(codec::FlowInner {
1164                next_incoming_id: if self.flags.contains(Flags::LOCAL) {
1165                    Some(self.next_incoming_id)
1166                } else {
1167                    None
1168                },
1169                incoming_window: u32::MAX,
1170                next_outgoing_id: self.next_outgoing_id,
1171                outgoing_window: self.remote_incoming_window,
1172                handle: None,
1173                delivery_count: None,
1174                link_credit: None,
1175                available: None,
1176                drain: false,
1177                echo: false,
1178                properties: None,
1179            }));
1180            self.post_frame(flow.into());
1181        }
1182
1183        // apply link flow
1184        if let Some(link) = link {
1185            link.inner.get_mut().apply_flow(flow);
1186        }
1187    }
1188
1189    pub(crate) fn rcv_link_flow(&mut self, handle: u32, delivery_count: u32, credit: u32) {
1190        let flow = Flow(Box::new(codec::FlowInner {
1191            next_incoming_id: Some(self.next_incoming_id),
1192            incoming_window: u32::MAX,
1193            next_outgoing_id: self.next_outgoing_id,
1194            outgoing_window: self.remote_incoming_window,
1195            handle: Some(handle),
1196            delivery_count: Some(delivery_count),
1197            link_credit: Some(credit),
1198            available: None,
1199            drain: false,
1200            echo: false,
1201            properties: None,
1202        }));
1203        self.post_frame(flow.into());
1204    }
1205
1206    pub(crate) async fn send_transfer(
1207        &mut self,
1208        link_handle: Handle,
1209        tag: Bytes,
1210        body: TransferBody,
1211        settled: bool,
1212        format: Option<MessageFormat>,
1213    ) -> Result<DeliveryNumber, AmqpProtocolError> {
1214        loop {
1215            if self.remote_incoming_window == 0 {
1216                log::trace!(
1217                    "{}: Remote window is 0, push to pending queue, hnd:{link_handle:?}",
1218                    self.sink.tag()
1219                );
1220                let (tx, rx) = self.pool_credit.channel();
1221                self.pending_transfers
1222                    .push_back(PendingTransfer { tx, link_handle });
1223
1224                rx.await
1225                    .map_err(|_| AmqpProtocolError::ConnectionDropped)
1226                    .and_then(|v| v)?;
1227                continue;
1228            }
1229            break;
1230        }
1231
1232        self.remote_incoming_window -= 1;
1233
1234        let delivery_id = self.next_outgoing_id;
1235        self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1);
1236
1237        let tr_settled = if settled {
1238            Some(DeliveryState::Accepted(Accepted {}))
1239        } else {
1240            None
1241        };
1242        let message_format = if format.is_none() {
1243            body.message_format()
1244        } else {
1245            format
1246        };
1247
1248        let max_frame_size = self.max_frame_size();
1249        let max_frame_size = if max_frame_size > 2048 {
1250            max_frame_size - 2048
1251        } else if max_frame_size == 0 {
1252            u32::MAX
1253        } else {
1254            max_frame_size
1255        } as usize;
1256
1257        // body is larger than allowed frame size, send body as a set of transfers
1258        if body.len() > max_frame_size {
1259            let mut body = match body {
1260                TransferBody::Data(data) => data,
1261                TransferBody::Message(msg) => {
1262                    let mut buf = self.memory_pool().buf_with_capacity(msg.encoded_size());
1263                    msg.encode(&mut buf);
1264                    buf.freeze()
1265                }
1266            };
1267
1268            let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1269
1270            let mut transfer = Transfer(Default::default());
1271            transfer.0.handle = link_handle;
1272            transfer.0.body = Some(TransferBody::Data(chunk));
1273            transfer.0.more = true;
1274            transfer.0.state = tr_settled;
1275            transfer.0.batchable = true;
1276            transfer.0.delivery_id = Some(delivery_id);
1277            transfer.0.delivery_tag = Some(tag.clone());
1278            transfer.0.message_format = message_format;
1279
1280            if settled {
1281                transfer.0.settled = Some(true);
1282            } else {
1283                self.unsettled_snd_deliveries
1284                    .insert(delivery_id, DeliveryInner::new(link_handle));
1285            }
1286
1287            log::trace!(
1288                "{}: Sending transfer over handle {link_handle}. window: {} delivery_id: {:?} delivery_tag: {:?}, more: {:?}, batchable: {:?}, settled: {:?}", self.sink.tag(),
1289                self.remote_incoming_window,
1290                transfer.delivery_id(),
1291                transfer.delivery_tag(),
1292                transfer.more(),
1293                transfer.batchable(),
1294                transfer.settled(),
1295            );
1296            self.post_frame(Frame::Transfer(transfer));
1297
1298            loop {
1299                // last chunk
1300                if body.is_empty() {
1301                    log::trace!("{}: Last tranfer for {tag:?} is sent", self.tag());
1302                    break;
1303                }
1304
1305                let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1306
1307                log::trace!("{}: Sending chunk tranfer for {tag:?}", self.tag());
1308                let mut transfer = Transfer(Default::default());
1309                transfer.0.handle = link_handle;
1310                transfer.0.body = Some(TransferBody::Data(chunk));
1311                transfer.0.more = !body.is_empty();
1312                transfer.0.batchable = true;
1313                transfer.0.message_format = message_format;
1314                self.post_frame(Frame::Transfer(transfer));
1315            }
1316        } else {
1317            let mut transfer = Transfer(Default::default());
1318            transfer.0.handle = link_handle;
1319            transfer.0.body = Some(body);
1320            transfer.0.state = tr_settled;
1321            transfer.0.delivery_id = Some(delivery_id);
1322            transfer.0.delivery_tag = Some(tag);
1323            transfer.0.message_format = message_format;
1324
1325            if settled {
1326                transfer.0.settled = Some(true);
1327            } else {
1328                self.unsettled_snd_deliveries
1329                    .insert(delivery_id, DeliveryInner::new(link_handle));
1330            }
1331            self.post_frame(Frame::Transfer(transfer));
1332        }
1333
1334        Ok(delivery_id)
1335    }
1336
1337    pub(crate) fn post_frame(&mut self, frame: Frame) {
1338        self.sink.post_frame(AmqpFrame::new(self.id(), frame));
1339    }
1340}