1use std::{cmp, collections::VecDeque, fmt, future::Future, mem};
2
3use ntex::channel::{condition, oneshot, pool};
4use ntex::util::{ByteString, Bytes, Either, HashMap, PoolRef, Ready};
5use slab::Slab;
6
7use ntex_amqp_codec::protocol::{
8 self as codec, Accepted, Attach, DeliveryNumber, DeliveryState, Detach, Disposition, End,
9 Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode, Source,
10 Transfer, TransferBody, TransferNumber,
11};
12use ntex_amqp_codec::{AmqpFrame, Encode};
13
14use crate::delivery::DeliveryInner;
15use crate::error::AmqpProtocolError;
16use crate::rcvlink::{
17 EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner,
18};
19use crate::sndlink::{EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner};
20use crate::{cell::Cell, types::Action, ConnectionRef, ControlFrame};
21
22pub(crate) const INITIAL_NEXT_OUTGOING_ID: TransferNumber = 1;
23
24#[derive(Clone)]
25pub struct Session {
26 pub(crate) inner: Cell<SessionInner>,
27}
28
29#[derive(Debug)]
30pub(crate) struct SessionInner {
31 id: usize,
32 sink: ConnectionRef,
33 next_outgoing_id: TransferNumber,
34 flags: Flags,
35
36 remote_channel_id: u16,
37 next_incoming_id: TransferNumber,
38 remote_outgoing_window: u32,
39 remote_incoming_window: u32,
40
41 links: Slab<Either<SenderLinkState, ReceiverLinkState>>,
42 links_by_name: HashMap<ByteString, usize>,
43 remote_handles: HashMap<Handle, usize>,
44 error: Option<AmqpProtocolError>,
45 closed: condition::Condition,
46
47 pending_transfers: VecDeque<PendingTransfer>,
48 pub(crate) unsettled_snd_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
49 pub(crate) unsettled_rcv_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
50
51 pub(crate) pool_notify: pool::Pool<()>,
52 pub(crate) pool_credit: pool::Pool<Result<(), AmqpProtocolError>>,
53}
54
55impl fmt::Debug for Session {
56 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
57 fmt.debug_struct("Session").finish()
58 }
59}
60
61impl Session {
62 pub(crate) fn new(inner: Cell<SessionInner>) -> Session {
63 Session { inner }
64 }
65
66 #[inline]
67 pub fn tag(&self) -> &'static str {
69 self.inner.get_ref().sink.tag()
70 }
71
72 #[inline]
73 pub fn connection(&self) -> &ConnectionRef {
74 &self.inner.get_ref().sink
75 }
76
77 #[inline]
78 pub fn local_channel_id(&self) -> u16 {
79 self.inner.get_ref().id()
80 }
81
82 #[inline]
83 pub fn remote_channel_id(&self) -> u16 {
84 self.inner.get_ref().remote_channel_id
85 }
86
87 #[inline]
88 pub fn remote_window_size(&self) -> u32 {
90 self.inner.get_ref().remote_incoming_window
91 }
92
93 pub fn end(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
94 let inner = self.inner.get_mut();
95
96 if inner.flags.contains(Flags::ENDED) {
97 Either::Left(Ready::Ok(()))
98 } else {
99 if !inner.flags.contains(Flags::ENDING) {
100 inner.sink.close_session(inner.remote_channel_id as usize);
101 inner.post_frame(Frame::End(End { error: None }));
102 inner.flags.insert(Flags::ENDING);
103 inner
104 .sink
105 .get_control_queue()
106 .enqueue_frame(ControlFrame::new_kind(
107 crate::ControlFrameKind::LocalSessionEnded(inner.get_all_links()),
108 ));
109 }
110 let inner = self.inner.clone();
111 Either::Right(async move {
112 inner.closed.wait().await;
113 if let Some(err @ AmqpProtocolError::SessionEnded(Some(_))) = inner.error.clone() {
114 Err(err)
115 } else {
116 Ok(())
117 }
118 })
119 }
120 }
121
122 pub fn get_sender_link(&self, name: &str) -> Option<&SenderLink> {
123 let inner = self.inner.get_ref();
124
125 if let Some(id) = inner.links_by_name.get(name) {
126 if let Some(Either::Left(SenderLinkState::Established(ref link))) = inner.links.get(*id)
127 {
128 return Some(link);
129 }
130 }
131 None
132 }
133
134 #[inline]
135 pub fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
136 self.inner.get_ref().get_sender_link_by_local_handle(hnd)
137 }
138
139 #[inline]
140 pub fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
141 self.inner.get_ref().get_sender_link_by_remote_handle(hnd)
142 }
143
144 #[inline]
145 pub fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
146 self.inner.get_ref().get_receiver_link_by_local_handle(hnd)
147 }
148
149 #[inline]
150 pub fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
151 self.inner.get_ref().get_receiver_link_by_remote_handle(hnd)
152 }
153
154 pub fn build_sender_link<T: Into<ByteString>, U: Into<ByteString>>(
156 &self,
157 name: U,
158 address: T,
159 ) -> SenderLinkBuilder {
160 let name = name.into();
161 let address = address.into();
162 SenderLinkBuilder::new(name, address, self.inner.clone())
163 }
164
165 pub fn build_receiver_link<T: Into<ByteString>, U: Into<ByteString>>(
167 &self,
168 name: U,
169 address: T,
170 ) -> ReceiverLinkBuilder {
171 let name = name.into();
172 let address = address.into();
173 ReceiverLinkBuilder::new(name, address, self.inner.clone())
174 }
175
176 pub fn detach_receiver_link(
178 &self,
179 handle: Handle,
180 error: Option<Error>,
181 ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
182 let (tx, rx) = oneshot::channel();
183
184 self.inner
185 .get_mut()
186 .detach_receiver_link(handle, false, error, tx);
187
188 async move {
189 match rx.await {
190 Ok(Ok(_)) => Ok(()),
191 Ok(Err(e)) => {
192 log::trace!("Cannot complete detach receiver link {e:?}");
193 Err(e)
194 }
195 Err(_) => {
196 log::trace!("Cannot complete detach receiver link, connection is gone");
197 Err(AmqpProtocolError::Disconnected)
198 }
199 }
200 }
201 }
202
203 pub fn detach_sender_link(
205 &self,
206 handle: Handle,
207 error: Option<Error>,
208 ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
209 let (tx, rx) = oneshot::channel();
210
211 self.inner
212 .get_mut()
213 .detach_sender_link(handle, false, error, tx);
214
215 async move {
216 match rx.await {
217 Ok(Ok(_)) => Ok(()),
218 Ok(Err(e)) => {
219 log::trace!("Cannot complete detach sender link {e:?}");
220 Err(e)
221 }
222 Err(_) => {
223 log::trace!("Cannot complete detach sender link, connection is gone");
224 Err(AmqpProtocolError::Disconnected)
225 }
226 }
227 }
228 }
229}
230
231#[derive(Debug)]
232enum SenderLinkState {
233 Established(EstablishedSenderLink),
234 OpeningRemote,
235 Opening(Option<oneshot::Sender<Result<Cell<SenderLinkInner>, AmqpProtocolError>>>),
236 Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
237}
238
239#[derive(Debug)]
240enum ReceiverLinkState {
241 Opening(Box<Option<(Cell<ReceiverLinkInner>, Option<Source>)>>),
242 OpeningLocal(
243 Option<(
244 Cell<ReceiverLinkInner>,
245 oneshot::Sender<Result<ReceiverLink, AmqpProtocolError>>,
246 )>,
247 ),
248 Established(EstablishedReceiverLink),
249 Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
250}
251
252impl SenderLinkState {
253 fn is_opening(&self) -> bool {
254 matches!(self, SenderLinkState::Opening(_))
255 }
256}
257
258impl ReceiverLinkState {
259 fn is_opening(&self) -> bool {
260 matches!(self, ReceiverLinkState::OpeningLocal(_))
261 }
262}
263
264bitflags::bitflags! {
265 #[derive(Copy, Clone, Debug)]
266 struct Flags: u8 {
267 const LOCAL = 0b0000_0001;
268 const ENDED = 0b0000_0010;
269 const ENDING = 0b0000_0100;
270 }
271}
272
273#[derive(Debug)]
274struct PendingTransfer {
275 tx: pool::Sender<Result<(), AmqpProtocolError>>,
276 link_handle: Handle,
277}
278
279impl SessionInner {
280 pub(crate) fn new(
281 id: usize,
282 local: bool,
283 sink: ConnectionRef,
284 remote_channel_id: u16,
285 next_incoming_id: DeliveryNumber,
286 remote_incoming_window: u32,
287 remote_outgoing_window: u32,
288 ) -> SessionInner {
289 SessionInner {
290 id,
291 sink,
292 next_incoming_id,
293 remote_channel_id,
294 remote_incoming_window,
295 remote_outgoing_window,
296 flags: if local { Flags::LOCAL } else { Flags::empty() },
297 next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
298 unsettled_snd_deliveries: HashMap::default(),
299 unsettled_rcv_deliveries: HashMap::default(),
300 links: Slab::new(),
301 links_by_name: HashMap::default(),
302 remote_handles: HashMap::default(),
303 pending_transfers: VecDeque::new(),
304 error: None,
305 pool_notify: pool::new(),
306 pool_credit: pool::new(),
307 closed: condition::Condition::new(),
308 }
309 }
310
311 pub(crate) fn id(&self) -> u16 {
313 self.id as u16
314 }
315
316 pub(crate) fn tag(&self) -> &'static str {
317 self.sink.tag()
318 }
319
320 pub(crate) fn memory_pool(&self) -> PoolRef {
321 self.sink.0.memory_pool()
322 }
323
324 pub(crate) fn unsettled_deliveries(
325 &mut self,
326 sender: bool,
327 ) -> &mut HashMap<DeliveryNumber, DeliveryInner> {
328 if sender {
329 &mut self.unsettled_snd_deliveries
330 } else {
331 &mut self.unsettled_rcv_deliveries
332 }
333 }
334
335 pub(crate) fn set_error(&mut self, err: AmqpProtocolError) {
337 log::trace!(
338 "{}: Connection is failed, dropping state: {err:?}",
339 self.tag()
340 );
341
342 for tr in self.pending_transfers.drain(..) {
344 let _ = tr.tx.send(Err(err.clone()));
345 }
346
347 for (_, mut promise) in self.unsettled_snd_deliveries.drain() {
349 promise.set_error(err.clone());
350 }
351 for (_, mut promise) in self.unsettled_rcv_deliveries.drain() {
352 promise.set_error(err.clone());
353 }
354
355 self.links_by_name.clear();
357 for (_, st) in self.links.iter_mut() {
358 match st {
359 Either::Left(SenderLinkState::Opening(_)) => (),
360 Either::Left(SenderLinkState::Established(ref mut link)) => {
361 link.inner.get_mut().remote_detached(err.clone());
362 }
363 Either::Left(SenderLinkState::Closing(ref mut link)) => {
364 if let Some(tx) = link.take() {
365 let _ = tx.send(Err(err.clone()));
366 }
367 }
368 Either::Right(ReceiverLinkState::Established(ref mut link)) => {
369 link.remote_detached(None);
370 }
371 _ => (),
372 }
373 }
374 self.links.clear();
375
376 self.error = Some(err);
377 self.flags.insert(Flags::ENDED);
378 self.closed.notify_and_lock_readiness();
379 }
380
381 pub(crate) fn end(&mut self, err: AmqpProtocolError) -> Action {
383 log::trace!("{}: Session is ended: {err:?}", self.tag());
384
385 let links = self.get_all_links();
386 self.set_error(err);
387
388 Action::SessionEnded(links)
389 }
390
391 fn get_all_links(&self) -> Vec<Either<SenderLink, ReceiverLink>> {
392 self.links
393 .iter()
394 .filter_map(|(_, st)| match st {
395 Either::Left(SenderLinkState::Established(ref link)) => {
396 Some(Either::Left((*link).clone()))
397 }
398 Either::Right(ReceiverLinkState::Established(ref link)) => {
399 Some(Either::Right((*link).clone()))
400 }
401 _ => None,
402 })
403 .collect()
404 }
405
406 pub(crate) fn max_frame_size(&self) -> u32 {
407 self.sink.0.max_frame_size
408 }
409
410 pub(crate) fn new_remote_sender(&mut self, attach: &Attach) -> (usize, Attach) {
412 let id = self
413 .links
414 .insert(Either::Left(SenderLinkState::OpeningRemote));
415
416 let attach = Attach(Box::new(codec::AttachInner {
417 name: attach.0.name.clone(),
418 handle: id as Handle,
419 role: Role::Sender,
420 snd_settle_mode: attach.snd_settle_mode(),
421 rcv_settle_mode: attach.rcv_settle_mode(),
422 source: attach.0.source.clone(),
423 target: attach.0.target.clone(),
424 unsettled: None,
425 incomplete_unsettled: false,
426 initial_delivery_count: Some(attach.initial_delivery_count().unwrap_or(0)),
427 max_message_size: None,
428 offered_capabilities: None,
429 desired_capabilities: None,
430 properties: None,
431 }));
432
433 (id, attach)
434 }
435
436 pub(crate) fn attach_local_sender_link(
438 &mut self,
439 mut frame: Attach,
440 ) -> oneshot::Receiver<Result<Cell<SenderLinkInner>, AmqpProtocolError>> {
441 let (tx, rx) = oneshot::channel();
442
443 let entry = self.links.vacant_entry();
444 let token = entry.key();
445 entry.insert(Either::Left(SenderLinkState::Opening(Some(tx))));
446 log::trace!(
447 "{}: Local sender link opening: {:?} hnd:{token:?}",
448 self.tag(),
449 frame.name(),
450 );
451
452 frame.0.handle = token as Handle;
453
454 self.links_by_name.insert(frame.0.name.clone(), token);
455 self.post_frame(Frame::Attach(frame));
456 rx
457 }
458
459 pub(crate) fn attach_remote_sender_link(
461 &mut self,
462 attach: &Attach,
463 mut response: Attach,
464 link: Cell<SenderLinkInner>,
465 ) -> SenderLink {
466 log::trace!(
467 "{}: Remote sender link attached: {:?}",
468 self.tag(),
469 attach.name()
470 );
471 let token = link.id;
472
473 if let Some(source) = attach.source() {
474 if let Some(ref addr) = source.address {
475 self.links_by_name.insert(addr.clone(), token);
476 }
477 }
478
479 self.remote_handles.insert(attach.handle(), token);
480 *self
481 .links
482 .get_mut(token)
483 .expect("new remote sender entry must exist") = Either::Left(
484 SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
485 );
486
487 *response.handle_mut() = token as Handle;
488 *response.max_message_size_mut() = link.max_message_size().map(|v| v as u64);
489
490 self.post_frame(response.into());
491 SenderLink::new(link)
492 }
493
494 pub(crate) fn detach_sender_link(
496 &mut self,
497 id: Handle,
498 closed: bool,
499 error: Option<Error>,
500 tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
501 ) {
502 if let Some(Either::Left(link)) = self.links.get_mut(id as usize) {
503 match link {
504 SenderLinkState::Opening(_) => {
505 let detach = Detach(Box::new(codec::DetachInner {
506 handle: id,
507 closed,
508 error,
509 }));
510 *link = SenderLinkState::Closing(Some(tx));
511 self.post_frame(detach.into());
512 }
513 SenderLinkState::Established(sender_link) => {
514 let sender_link = sender_link.clone();
515 let detach = Detach(Box::new(codec::DetachInner {
516 handle: id,
517 closed,
518 error,
519 }));
520
521 *link = SenderLinkState::Closing(Some(tx));
522 self.post_frame(detach.clone().into());
523 self.sink
524 .get_control_queue()
525 .enqueue_frame(ControlFrame::new_kind(
526 crate::ControlFrameKind::LocalDetachSender(detach, sender_link),
527 ))
528 }
529 SenderLinkState::OpeningRemote => {
530 let _ = tx.send(Ok(()));
531 log::error!(
532 "{}: Unexpected sender link state: opening remote - {id}",
533 self.tag()
534 );
535 }
536 SenderLinkState::Closing(_) => {
537 let _ = tx.send(Ok(()));
538 log::error!(
539 "{}: Unexpected sender link state: closing - {id}",
540 self.tag()
541 );
542 }
543 }
544 } else {
545 let _ = tx.send(Ok(()));
546 log::debug!(
547 "{}: Sender link does not exist while detaching: {id}",
548 self.tag()
549 );
550 }
551 }
552
553 pub(crate) fn detach_unconfirmed_sender_link(
555 &mut self,
556 attach: &Attach,
557 link: Cell<SenderLinkInner>,
558 error: Option<Error>,
559 ) {
560 let token = link.id;
561
562 let attach = Attach(Box::new(codec::AttachInner {
563 name: attach.0.name.clone(),
564 handle: token as Handle,
565 role: attach.0.role,
566 snd_settle_mode: SenderSettleMode::Unsettled,
567 rcv_settle_mode: ReceiverSettleMode::First,
568 source: None,
569 target: None,
570 unsettled: None,
571 incomplete_unsettled: false,
572 initial_delivery_count: None,
573 max_message_size: None,
574 offered_capabilities: None,
575 desired_capabilities: None,
576 properties: None,
577 }));
578 self.post_frame(attach.into());
579
580 let detach = Detach(Box::new(codec::DetachInner {
581 handle: token as Handle,
582 closed: true,
583 error,
584 }));
585 self.post_frame(detach.into());
586
587 if self.links.contains(token) {
588 self.links.remove(token);
589 }
590 }
591
592 pub(crate) fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
593 if let Some(Either::Left(SenderLinkState::Established(ref link))) =
594 self.links.get(hnd as usize)
595 {
596 Some(link)
597 } else {
598 None
599 }
600 }
601
602 pub(crate) fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
603 if let Some(id) = self.remote_handles.get(&hnd) {
604 if let Some(Either::Left(SenderLinkState::Established(ref link))) = self.links.get(*id)
605 {
606 return Some(link);
607 }
608 }
609 None
610 }
611
612 pub(crate) fn attach_remote_receiver_link(
614 &mut self,
615 cell: Cell<SessionInner>,
616 attach: &Attach,
617 ) -> (Attach, ReceiverLink) {
618 let handle = attach.handle();
619 let entry = self.links.vacant_entry();
620 let token = entry.key();
621
622 let inner = Cell::new(ReceiverLinkInner::new(cell, token as u32, handle, attach));
623 entry.insert(Either::Right(ReceiverLinkState::Opening(Box::new(Some((
624 inner.clone(),
625 attach.source().cloned(),
626 ))))));
627 self.remote_handles.insert(handle, token);
628
629 let response = Attach(Box::new(codec::AttachInner {
630 name: attach.0.name.clone(),
631 handle: token as Handle,
632 role: Role::Receiver,
633 snd_settle_mode: attach.snd_settle_mode(),
634 rcv_settle_mode: ReceiverSettleMode::First,
635 source: attach.0.source.clone(),
636 target: attach.0.target.clone(),
637 unsettled: None,
638 incomplete_unsettled: false,
639 initial_delivery_count: Some(0),
640 offered_capabilities: None,
641 desired_capabilities: None,
642 max_message_size: None,
643 properties: None,
644 }));
645
646 (response, ReceiverLink::new(inner))
647 }
648
649 pub(crate) fn attach_local_receiver_link(
650 &mut self,
651 cell: Cell<SessionInner>,
652 mut frame: Attach,
653 ) -> oneshot::Receiver<Result<ReceiverLink, AmqpProtocolError>> {
654 let (tx, rx) = oneshot::channel();
655
656 let entry = self.links.vacant_entry();
657 let token = entry.key();
658
659 let inner = Cell::new(ReceiverLinkInner::new(
660 cell,
661 token as u32,
662 token as u32,
663 &frame,
664 ));
665 entry.insert(Either::Right(ReceiverLinkState::OpeningLocal(Some((
666 inner, tx,
667 )))));
668
669 frame.0.handle = token as Handle;
670
671 self.links_by_name.insert(frame.0.name.clone(), token);
672 self.post_frame(Frame::Attach(frame));
673 rx
674 }
675
676 pub(crate) fn confirm_receiver_link(
677 &mut self,
678 token: Handle,
679 mut response: Attach,
680 max_message_size: Option<u64>,
681 ) {
682 if let Some(Either::Right(link)) = self.links.get_mut(token as usize) {
683 if let ReceiverLinkState::Opening(l) = link {
684 if let Some((l, _)) = l.take() {
685 *response.max_message_size_mut() = max_message_size;
686 *link = ReceiverLinkState::Established(EstablishedReceiverLink::new(l));
687 self.post_frame(response.into());
688 return;
689 }
690 }
691 }
692 log::error!("{}: Unexpected receiver link state", self.tag());
694 }
695
696 pub(crate) fn detach_receiver_link(
698 &mut self,
699 id: Handle,
700 closed: bool,
701 error: Option<Error>,
702 tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
703 ) {
704 if let Some(Either::Right(link)) = self.links.get_mut(id as usize) {
705 match link {
706 ReceiverLinkState::Opening(inner) => {
707 if let Some((inner, source)) = inner.take() {
708 let attach = Attach(Box::new(codec::AttachInner {
709 source,
710 max_message_size: None,
711 name: inner.name().clone(),
712 handle: id,
713 role: Role::Receiver,
714 snd_settle_mode: SenderSettleMode::Mixed,
715 rcv_settle_mode: ReceiverSettleMode::First,
716 target: None,
717 unsettled: None,
718 incomplete_unsettled: false,
719 initial_delivery_count: Some(0),
720 offered_capabilities: None,
721 desired_capabilities: None,
722 properties: None,
723 }));
724 self.post_frame(attach.into());
725 }
726 let detach = Detach(Box::new(codec::DetachInner {
727 closed,
728 error,
729 handle: id,
730 }));
731 self.post_frame(detach.into());
732 let _ = tx.send(Ok(()));
733 if self.links.contains(id as usize) {
734 let _ = self.links.remove(id as usize);
735 }
736 }
737 ReceiverLinkState::Established(receiver_link) => {
738 let receiver_link = receiver_link.clone();
739 let detach = Detach(Box::new(codec::DetachInner {
740 handle: id,
741 closed,
742 error,
743 }));
744 *link = ReceiverLinkState::Closing(Some(tx));
745 self.post_frame(detach.clone().into());
746 self.sink
747 .get_control_queue()
748 .enqueue_frame(ControlFrame::new_kind(
749 crate::ControlFrameKind::LocalDetachReceiver(detach, receiver_link),
750 ))
751 }
752 ReceiverLinkState::Closing(_) => {
753 let _ = tx.send(Ok(()));
754 if self.links.contains(id as usize) {
755 let _ = self.links.remove(id as usize);
756 }
757 log::error!(
758 "{}: Unexpected receiver link state: closing - {id}",
759 self.tag()
760 );
761 }
762 ReceiverLinkState::OpeningLocal(_inner) => unimplemented!(),
763 }
764 } else {
765 let _ = tx.send(Ok(()));
766 log::error!(
767 "{}: Receiver link does not exist while detaching: {id}",
768 self.tag()
769 );
770 }
771 }
772
773 pub(crate) fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
774 if let Some(Either::Right(ReceiverLinkState::Established(ref link))) =
775 self.links.get(hnd as usize)
776 {
777 Some(link)
778 } else {
779 None
780 }
781 }
782
783 pub(crate) fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
784 if let Some(id) = self.remote_handles.get(&hnd) {
785 if let Some(Either::Right(ReceiverLinkState::Established(ref link))) =
786 self.links.get(*id)
787 {
788 return Some(link);
789 }
790 }
791 None
792 }
793
794 pub(crate) fn handle_frame(&mut self, frame: Frame) -> Result<Action, AmqpProtocolError> {
795 if self.error.is_none() {
796 match frame {
797 Frame::Flow(flow) => {
798 if let Some(Either::Left(link)) = flow
800 .handle()
801 .and_then(|h| self.remote_handles.get(&h).copied())
802 .and_then(|h| self.links.get_mut(h))
803 {
804 if let SenderLinkState::Established(ref link) = link {
805 return Ok(Action::Flow((*link).clone(), flow));
806 }
807 log::warn!("{}: Received flow frame", self.tag());
808 }
809 self.handle_flow(&flow, None);
810 Ok(Action::None)
811 }
812 Frame::Disposition(disp) => {
813 self.settle_deliveries(disp);
814 Ok(Action::None)
815 }
816 Frame::Transfer(transfer) => {
817 let idx = if let Some(idx) = self.remote_handles.get(&transfer.handle()) {
818 *idx
819 } else {
820 log::debug!(
821 "{}: Transfer's link {:?} is unknown",
822 self.tag(),
823 transfer.handle()
824 );
825 return Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)));
826 };
827
828 if let Some(link) = self.links.get_mut(idx) {
829 match link {
830 Either::Left(_) => {
831 log::debug!(
832 "{}: Got unexpected trasfer from sender link",
833 self.tag()
834 );
835 Err(AmqpProtocolError::Unexpected(Frame::Transfer(transfer)))
836 }
837 Either::Right(link) => match link {
838 ReceiverLinkState::Opening(_) => {
839 log::debug!(
840 "{}: Got transfer for opening link: {} -> {idx}",
841 self.tag(),
842 transfer.handle()
843 );
844 Err(AmqpProtocolError::UnexpectedOpeningState(Frame::Transfer(
845 transfer,
846 )))
847 }
848 ReceiverLinkState::OpeningLocal(_) => {
849 log::debug!(
850 "{}: Got transfer for opening link: {} -> {idx}",
851 self.tag(),
852 transfer.handle()
853 );
854 Err(AmqpProtocolError::UnexpectedOpeningState(Frame::Transfer(
855 transfer,
856 )))
857 }
858 ReceiverLinkState::Established(link) => {
859 let _ = self.next_incoming_id.wrapping_add(1);
861 link.inner.get_mut().handle_transfer(transfer, &link.inner)
862 }
863 ReceiverLinkState::Closing(_) => Ok(Action::None),
864 },
865 }
866 } else {
867 Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)))
868 }
869 }
870 Frame::Detach(detach) => Ok(self.handle_detach(detach)),
871 frame => {
872 log::debug!("{}: Unexpected frame: {frame:?}", self.tag());
873 Ok(Action::None)
874 }
875 }
876 } else {
877 Ok(Action::None)
878 }
879 }
880
881 pub(crate) fn handle_attach(&mut self, attach: &Attach, cell: Cell<SessionInner>) -> bool {
883 let name = attach.name();
884
885 if let Some(index) = self.links_by_name.get(name) {
886 match self.links.get_mut(*index) {
887 Some(Either::Left(item)) => {
888 if item.is_opening() {
889 log::trace!(
890 "{}: Local sender link attached: {name:?} {index} -> {}, {:?}",
891 self.sink.tag(),
892 attach.handle(),
893 self.remote_handles.contains_key(&attach.handle())
894 );
895
896 self.remote_handles.insert(attach.handle(), *index);
897 let delivery_count = attach.initial_delivery_count().unwrap_or(0);
898 let link = Cell::new(SenderLinkInner::new(
899 *index,
900 name.clone(),
901 attach.handle(),
902 delivery_count,
903 cell,
904 attach
905 .max_message_size()
906 .map(|v| u32::try_from(v).unwrap_or(u32::MAX)),
907 ));
908 let local_sender = mem::replace(
909 item,
910 SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
911 );
912
913 if let SenderLinkState::Opening(Some(tx)) = local_sender {
914 let _ = tx.send(Ok(link));
915 }
916 }
917 }
918 Some(Either::Right(item)) => {
919 if item.is_opening() {
920 log::trace!(
921 "{}: Local receiver link attached: {name:?} {index} -> {}",
922 self.sink.tag(),
923 attach.handle()
924 );
925 if let ReceiverLinkState::OpeningLocal(opt_item) = item {
926 if let Some((link, tx)) = opt_item.take() {
927 self.remote_handles.insert(attach.handle(), *index);
928
929 *item = ReceiverLinkState::Established(
930 EstablishedReceiverLink::new(link.clone()),
931 );
932 let _ = tx.send(Ok(ReceiverLink::new(link)));
933 } else {
934 log::error!("{}: Inconsistent session state, bug", self.tag());
936 }
937 }
938 }
939 }
940 _ => {
941 }
943 }
944 true
945 } else {
946 false
948 }
949 }
950
951 pub(crate) fn handle_detach(&mut self, mut frame: Detach) -> Action {
953 let idx = if let Some(idx) = self.remote_handles.get(&frame.handle()) {
955 *idx
956 } else if self.links.contains(frame.handle() as usize) {
957 frame.handle() as usize
958 } else {
959 log::info!("{}: Detaching unknown link: {frame:?}", self.tag());
961 return Action::None;
962 };
963
964 let handle = frame.handle();
965 let mut action = Action::None;
966
967 let remove = if let Some(link) = self.links.get_mut(idx) {
968 match link {
969 Either::Left(link) => match link {
970 SenderLinkState::Opening(ref mut tx) => {
971 if let Some(tx) = tx.take() {
972 let err = AmqpProtocolError::LinkDetached(frame.0.error.clone());
973 let _ = tx.send(Err(err));
974 }
975 true
976 }
977 SenderLinkState::Established(link) => {
978 let detach = Detach(Box::new(codec::DetachInner {
980 handle: link.inner.get_ref().id(),
981 closed: true,
982 error: frame.error().cloned(),
983 }));
984 let err = AmqpProtocolError::LinkDetached(detach.0.error.clone());
985
986 self.links_by_name.remove(link.inner.name());
988
989 let mut idx = 0;
991 let handle = link.inner.get_ref().remote_handle();
992 while idx < self.pending_transfers.len() {
993 if self.pending_transfers[idx].link_handle == handle {
994 let tr = self.pending_transfers.remove(idx).unwrap();
995 let _ = tr.tx.send(Err(err.clone()));
996 } else {
997 idx += 1;
998 }
999 }
1000
1001 let handle = link.inner.get_ref().id() as Handle;
1003 for delivery in self.unsettled_snd_deliveries.values_mut() {
1004 if delivery.handle() == handle {
1005 delivery.set_error(err.clone());
1006 }
1007 }
1008
1009 link.inner.get_mut().remote_detached(err);
1011 self.sink
1012 .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1013 action = Action::DetachSender(link.clone(), frame);
1014 true
1015 }
1016 SenderLinkState::OpeningRemote => {
1017 log::warn!(
1018 "{}: Detach frame received for unconfirmed sender link: {frame:?}",
1019 self.tag()
1020 );
1021 true
1022 }
1023 SenderLinkState::Closing(tx) => {
1024 if let Some(tx) = tx.take() {
1025 if let Some(err) = frame.0.error {
1026 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1027 } else {
1028 let _ = tx.send(Ok(()));
1029 }
1030 }
1031 true
1032 }
1033 },
1034 Either::Right(link) => match link {
1035 ReceiverLinkState::Opening(_) => false,
1036 ReceiverLinkState::OpeningLocal(ref mut item) => {
1037 if let Some((inner, tx)) = item.take() {
1038 inner.get_mut().detached();
1039 if let Some(err) = frame.0.error.clone() {
1040 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1041 } else {
1042 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(None)));
1043 }
1044 } else {
1045 log::error!("{}: Inconsistent session state, bug", self.tag());
1046 }
1047
1048 true
1049 }
1050 ReceiverLinkState::Established(link) => {
1051 let error = frame.0.error.take();
1052
1053 let err = AmqpProtocolError::LinkDetached(error.clone());
1055 let handle = link.inner.get_ref().id();
1056 for delivery in self.unsettled_rcv_deliveries.values_mut() {
1057 if delivery.handle() == handle {
1058 delivery.set_error(err.clone());
1059 }
1060 }
1061
1062 let detach = Detach(Box::new(codec::DetachInner {
1064 handle: link.handle(),
1065 closed: true,
1066 error: None,
1067 }));
1068 self.sink
1069 .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1070
1071 link.remote_detached(error);
1073 action = Action::DetachReceiver(link.clone(), frame);
1074 true
1075 }
1076 ReceiverLinkState::Closing(tx) => {
1077 if let Some(tx) = tx.take() {
1079 if let Some(err) = frame.0.error {
1080 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1081 } else {
1082 let _ = tx.send(Ok(()));
1083 }
1084 }
1085 true
1086 }
1087 },
1088 }
1089 } else {
1090 false
1091 };
1092
1093 if remove {
1094 if self.links.contains(idx) {
1095 self.links.remove(idx);
1096 }
1097 self.remote_handles.remove(&handle);
1098 }
1099 action
1100 }
1101
1102 fn settle_deliveries(&mut self, disp: Disposition) {
1103 let from = disp.first();
1104 let to = disp.last();
1105
1106 if cfg!(feature = "frame-trace") {
1107 log::trace!("{}: Settle delivery: {disp:#?}", self.tag());
1108 } else {
1109 log::trace!(
1110 "{}: Settle delivery from {from} - {to:?}, state {:?} settled: {:?}",
1111 self.tag(),
1112 disp.state(),
1113 disp.settled()
1114 );
1115 }
1116
1117 let deliveries = if disp.role() == Role::Receiver {
1118 &mut self.unsettled_snd_deliveries
1119 } else {
1120 &mut self.unsettled_rcv_deliveries
1121 };
1122
1123 if let Some(to) = to {
1124 for no in from..=to {
1125 if let Some(delivery) = deliveries.get_mut(&no) {
1126 delivery.handle_disposition(disp.clone());
1127 } else {
1128 log::trace!(
1129 "{}: Unknown deliveryid: {no:?} disp: {disp:?}",
1130 self.sink.tag()
1131 );
1132 }
1133 }
1134 } else if let Some(delivery) = deliveries.get_mut(&from) {
1135 delivery.handle_disposition(disp);
1136 }
1137 }
1138
1139 pub(crate) fn handle_flow(&mut self, flow: &Flow, link: Option<&SenderLink>) {
1140 self.next_incoming_id = flow.next_outgoing_id();
1142 self.remote_outgoing_window = flow.outgoing_window();
1143
1144 self.remote_incoming_window = flow
1145 .next_incoming_id()
1146 .unwrap_or(0)
1147 .wrapping_add(flow.incoming_window())
1148 .wrapping_sub(self.next_outgoing_id);
1149
1150 log::trace!(
1151 "{}: Session received credit {:?}. window: {}, pending: {}",
1152 self.tag(),
1153 flow.link_credit(),
1154 self.remote_incoming_window,
1155 self.pending_transfers.len(),
1156 );
1157
1158 while let Some(tr) = self.pending_transfers.pop_front() {
1159 let _ = tr.tx.send(Ok(()));
1160 }
1161
1162 if flow.echo() {
1163 let flow = Flow(Box::new(codec::FlowInner {
1164 next_incoming_id: if self.flags.contains(Flags::LOCAL) {
1165 Some(self.next_incoming_id)
1166 } else {
1167 None
1168 },
1169 incoming_window: u32::MAX,
1170 next_outgoing_id: self.next_outgoing_id,
1171 outgoing_window: self.remote_incoming_window,
1172 handle: None,
1173 delivery_count: None,
1174 link_credit: None,
1175 available: None,
1176 drain: false,
1177 echo: false,
1178 properties: None,
1179 }));
1180 self.post_frame(flow.into());
1181 }
1182
1183 if let Some(link) = link {
1185 link.inner.get_mut().apply_flow(flow);
1186 }
1187 }
1188
1189 pub(crate) fn rcv_link_flow(&mut self, handle: u32, delivery_count: u32, credit: u32) {
1190 let flow = Flow(Box::new(codec::FlowInner {
1191 next_incoming_id: Some(self.next_incoming_id),
1192 incoming_window: u32::MAX,
1193 next_outgoing_id: self.next_outgoing_id,
1194 outgoing_window: self.remote_incoming_window,
1195 handle: Some(handle),
1196 delivery_count: Some(delivery_count),
1197 link_credit: Some(credit),
1198 available: None,
1199 drain: false,
1200 echo: false,
1201 properties: None,
1202 }));
1203 self.post_frame(flow.into());
1204 }
1205
1206 pub(crate) async fn send_transfer(
1207 &mut self,
1208 link_handle: Handle,
1209 tag: Bytes,
1210 body: TransferBody,
1211 settled: bool,
1212 format: Option<MessageFormat>,
1213 ) -> Result<DeliveryNumber, AmqpProtocolError> {
1214 loop {
1215 if self.remote_incoming_window == 0 {
1216 log::trace!(
1217 "{}: Remote window is 0, push to pending queue, hnd:{link_handle:?}",
1218 self.sink.tag()
1219 );
1220 let (tx, rx) = self.pool_credit.channel();
1221 self.pending_transfers
1222 .push_back(PendingTransfer { tx, link_handle });
1223
1224 rx.await
1225 .map_err(|_| AmqpProtocolError::ConnectionDropped)
1226 .and_then(|v| v)?;
1227 continue;
1228 }
1229 break;
1230 }
1231
1232 self.remote_incoming_window -= 1;
1233
1234 let delivery_id = self.next_outgoing_id;
1235 self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1);
1236
1237 let tr_settled = if settled {
1238 Some(DeliveryState::Accepted(Accepted {}))
1239 } else {
1240 None
1241 };
1242 let message_format = if format.is_none() {
1243 body.message_format()
1244 } else {
1245 format
1246 };
1247
1248 let max_frame_size = self.max_frame_size();
1249 let max_frame_size = if max_frame_size > 2048 {
1250 max_frame_size - 2048
1251 } else if max_frame_size == 0 {
1252 u32::MAX
1253 } else {
1254 max_frame_size
1255 } as usize;
1256
1257 if body.len() > max_frame_size {
1259 let mut body = match body {
1260 TransferBody::Data(data) => data,
1261 TransferBody::Message(msg) => {
1262 let mut buf = self.memory_pool().buf_with_capacity(msg.encoded_size());
1263 msg.encode(&mut buf);
1264 buf.freeze()
1265 }
1266 };
1267
1268 let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1269
1270 let mut transfer = Transfer(Default::default());
1271 transfer.0.handle = link_handle;
1272 transfer.0.body = Some(TransferBody::Data(chunk));
1273 transfer.0.more = true;
1274 transfer.0.state = tr_settled;
1275 transfer.0.batchable = true;
1276 transfer.0.delivery_id = Some(delivery_id);
1277 transfer.0.delivery_tag = Some(tag.clone());
1278 transfer.0.message_format = message_format;
1279
1280 if settled {
1281 transfer.0.settled = Some(true);
1282 } else {
1283 self.unsettled_snd_deliveries
1284 .insert(delivery_id, DeliveryInner::new(link_handle));
1285 }
1286
1287 log::trace!(
1288 "{}: Sending transfer over handle {link_handle}. window: {} delivery_id: {:?} delivery_tag: {:?}, more: {:?}, batchable: {:?}, settled: {:?}", self.sink.tag(),
1289 self.remote_incoming_window,
1290 transfer.delivery_id(),
1291 transfer.delivery_tag(),
1292 transfer.more(),
1293 transfer.batchable(),
1294 transfer.settled(),
1295 );
1296 self.post_frame(Frame::Transfer(transfer));
1297
1298 loop {
1299 if body.is_empty() {
1301 log::trace!("{}: Last tranfer for {tag:?} is sent", self.tag());
1302 break;
1303 }
1304
1305 let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1306
1307 log::trace!("{}: Sending chunk tranfer for {tag:?}", self.tag());
1308 let mut transfer = Transfer(Default::default());
1309 transfer.0.handle = link_handle;
1310 transfer.0.body = Some(TransferBody::Data(chunk));
1311 transfer.0.more = !body.is_empty();
1312 transfer.0.batchable = true;
1313 transfer.0.message_format = message_format;
1314 self.post_frame(Frame::Transfer(transfer));
1315 }
1316 } else {
1317 let mut transfer = Transfer(Default::default());
1318 transfer.0.handle = link_handle;
1319 transfer.0.body = Some(body);
1320 transfer.0.state = tr_settled;
1321 transfer.0.delivery_id = Some(delivery_id);
1322 transfer.0.delivery_tag = Some(tag);
1323 transfer.0.message_format = message_format;
1324
1325 if settled {
1326 transfer.0.settled = Some(true);
1327 } else {
1328 self.unsettled_snd_deliveries
1329 .insert(delivery_id, DeliveryInner::new(link_handle));
1330 }
1331 self.post_frame(Frame::Transfer(transfer));
1332 }
1333
1334 Ok(delivery_id)
1335 }
1336
1337 pub(crate) fn post_frame(&mut self, frame: Frame) {
1338 self.sink.post_frame(AmqpFrame::new(self.id(), frame));
1339 }
1340}