1use std::{cmp, collections::VecDeque, fmt, future::Future, mem};
2
3use ntex_bytes::{ByteString, Bytes};
4use ntex_util::channel::{condition, oneshot, pool};
5use ntex_util::{HashMap, future::Either, future::Ready};
6use slab::Slab;
7
8use ntex_amqp_codec::protocol::{
9 self as codec, Accepted, Attach, Begin, DeliveryNumber, DeliveryState, Detach, Disposition,
10 End, Error, Flow, Frame, Handle, MessageFormat, ReceiverSettleMode, Role, SenderSettleMode,
11 Source, Transfer, TransferBody, TransferNumber,
12};
13use ntex_amqp_codec::{AmqpFrame, Encode};
14
15use crate::delivery::DeliveryInner;
16use crate::error::AmqpProtocolError;
17use crate::rcvlink::{
18 EstablishedReceiverLink, ReceiverLink, ReceiverLinkBuilder, ReceiverLinkInner,
19};
20use crate::sndlink::{EstablishedSenderLink, SenderLink, SenderLinkBuilder, SenderLinkInner};
21use crate::{ConnectionRef, ControlFrame, cell::Cell, types::Action};
22
23pub(crate) const INITIAL_NEXT_OUTGOING_ID: TransferNumber = 1;
24
25#[derive(Clone)]
26pub struct Session {
27 pub(crate) inner: Cell<SessionInner>,
28}
29
30#[derive(Debug)]
31pub(crate) struct SessionInner {
32 id: usize,
33 sink: ConnectionRef,
34 next_outgoing_id: TransferNumber,
35 flags: Flags,
36 begin: Begin,
37
38 remote_channel_id: u16,
39 next_incoming_id: TransferNumber,
40 remote_outgoing_window: u32,
41 remote_incoming_window: u32,
42
43 links: Slab<Either<SenderLinkState, ReceiverLinkState>>,
44 links_by_name: HashMap<ByteString, usize>,
45 remote_handles: HashMap<Handle, usize>,
46 error: Option<AmqpProtocolError>,
47 closed: condition::Condition,
48
49 pending_transfers: VecDeque<PendingTransfer>,
50 pub(crate) unsettled_snd_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
51 pub(crate) unsettled_rcv_deliveries: HashMap<DeliveryNumber, DeliveryInner>,
52
53 pub(crate) pool_notify: pool::Pool<()>,
54 pub(crate) pool_credit: pool::Pool<Result<(), AmqpProtocolError>>,
55}
56
57impl fmt::Debug for Session {
58 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
59 fmt.debug_struct("Session").finish()
60 }
61}
62
63impl Session {
64 pub(crate) fn new(inner: Cell<SessionInner>) -> Session {
65 Session { inner }
66 }
67
68 #[inline]
69 pub fn frame(&self) -> &Begin {
71 &self.inner.get_ref().begin
72 }
73
74 #[inline]
75 pub fn tag(&self) -> &'static str {
77 self.inner.get_ref().sink.tag()
78 }
79
80 #[inline]
81 pub fn connection(&self) -> &ConnectionRef {
82 &self.inner.get_ref().sink
83 }
84
85 #[inline]
86 pub fn local_channel_id(&self) -> u16 {
87 self.inner.get_ref().id()
88 }
89
90 #[inline]
91 pub fn remote_channel_id(&self) -> u16 {
92 self.inner.get_ref().remote_channel_id
93 }
94
95 #[inline]
96 pub fn remote_window_size(&self) -> u32 {
98 self.inner.get_ref().remote_incoming_window
99 }
100
101 pub fn end(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
102 let inner = self.inner.get_mut();
103
104 if inner.flags.contains(Flags::ENDED) {
105 Either::Left(Ready::Ok(()))
106 } else {
107 if !inner.flags.contains(Flags::ENDING) {
108 inner.sink.close_session(inner.remote_channel_id as usize);
109 inner.post_frame(Frame::End(End { error: None }));
110 inner.flags.insert(Flags::ENDING);
111 inner
112 .sink
113 .get_control_queue()
114 .enqueue_frame(ControlFrame::new_kind(
115 crate::ControlFrameKind::LocalSessionEnded(inner.get_all_links()),
116 ));
117 }
118 let inner = self.inner.clone();
119 Either::Right(async move {
120 inner.closed.wait().await;
121 if let Some(err @ AmqpProtocolError::SessionEnded(Some(_))) = inner.error.clone() {
122 Err(err)
123 } else {
124 Ok(())
125 }
126 })
127 }
128 }
129
130 pub fn get_sender_link(&self, name: &str) -> Option<&SenderLink> {
131 let inner = self.inner.get_ref();
132
133 if let Some(id) = inner.links_by_name.get(name)
134 && let Some(Either::Left(SenderLinkState::Established(link))) = inner.links.get(*id)
135 {
136 return Some(link);
137 }
138 None
139 }
140
141 #[inline]
142 pub fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
143 self.inner.get_ref().get_sender_link_by_local_handle(hnd)
144 }
145
146 #[inline]
147 pub fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
148 self.inner.get_ref().get_sender_link_by_remote_handle(hnd)
149 }
150
151 #[inline]
152 pub fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
153 self.inner.get_ref().get_receiver_link_by_local_handle(hnd)
154 }
155
156 #[inline]
157 pub fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
158 self.inner.get_ref().get_receiver_link_by_remote_handle(hnd)
159 }
160
161 pub fn build_sender_link<T: Into<ByteString>, U: Into<ByteString>>(
163 &self,
164 name: U,
165 address: T,
166 ) -> SenderLinkBuilder {
167 let name = name.into();
168 let address = address.into();
169 SenderLinkBuilder::new(name, address, self.inner.clone())
170 }
171
172 pub fn build_receiver_link<T: Into<ByteString>, U: Into<ByteString>>(
174 &self,
175 name: U,
176 address: T,
177 ) -> ReceiverLinkBuilder {
178 let name = name.into();
179 let address = address.into();
180 ReceiverLinkBuilder::new(name, address, self.inner.clone())
181 }
182
183 pub fn detach_receiver_link(
185 &self,
186 handle: Handle,
187 error: Option<Error>,
188 ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
189 let (tx, rx) = oneshot::channel();
190
191 self.inner
192 .get_mut()
193 .detach_receiver_link(handle, false, error, tx);
194
195 async move {
196 match rx.await {
197 Ok(Ok(())) => Ok(()),
198 Ok(Err(e)) => {
199 log::trace!("Cannot complete detach receiver link {e:?}");
200 Err(e)
201 }
202 Err(_) => {
203 log::trace!("Cannot complete detach receiver link, connection is gone");
204 Err(AmqpProtocolError::Disconnected)
205 }
206 }
207 }
208 }
209
210 pub fn detach_sender_link(
212 &self,
213 handle: Handle,
214 error: Option<Error>,
215 ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
216 let (tx, rx) = oneshot::channel();
217
218 self.inner
219 .get_mut()
220 .detach_sender_link(handle, false, error, tx);
221
222 async move {
223 match rx.await {
224 Ok(Ok(())) => Ok(()),
225 Ok(Err(e)) => {
226 log::trace!("Cannot complete detach sender link {e:?}");
227 Err(e)
228 }
229 Err(_) => {
230 log::trace!("Cannot complete detach sender link, connection is gone");
231 Err(AmqpProtocolError::Disconnected)
232 }
233 }
234 }
235 }
236}
237
238#[derive(Debug)]
239enum SenderLinkState {
240 Established(EstablishedSenderLink),
241 OpeningRemote,
242 Opening(Option<oneshot::Sender<Result<Cell<SenderLinkInner>, AmqpProtocolError>>>),
243 Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
244}
245
246#[derive(Debug)]
247enum ReceiverLinkState {
248 Opening(Box<Option<(Cell<ReceiverLinkInner>, Option<Source>)>>),
249 OpeningLocal(
250 Option<(
251 Cell<ReceiverLinkInner>,
252 oneshot::Sender<Result<ReceiverLink, AmqpProtocolError>>,
253 )>,
254 ),
255 Established(EstablishedReceiverLink),
256 Closing(Option<oneshot::Sender<Result<(), AmqpProtocolError>>>),
257}
258
259impl SenderLinkState {
260 fn is_opening(&self) -> bool {
261 matches!(self, SenderLinkState::Opening(_))
262 }
263}
264
265impl ReceiverLinkState {
266 fn is_opening(&self) -> bool {
267 matches!(self, ReceiverLinkState::OpeningLocal(_))
268 }
269}
270
271bitflags::bitflags! {
272 #[derive(Copy, Clone, Debug)]
273 struct Flags: u8 {
274 const LOCAL = 0b0000_0001;
275 const ENDED = 0b0000_0010;
276 const ENDING = 0b0000_0100;
277 }
278}
279
280#[derive(Debug)]
281struct PendingTransfer {
282 tx: pool::Sender<Result<(), AmqpProtocolError>>,
283 link_handle: Handle,
284}
285
286impl SessionInner {
287 pub(crate) fn new(
288 id: usize,
289 local: bool,
290 sink: ConnectionRef,
291 remote_channel_id: u16,
292 begin: Begin,
293 ) -> SessionInner {
294 SessionInner {
295 next_incoming_id: begin.next_outgoing_id(),
296 remote_incoming_window: begin.incoming_window(),
297 remote_outgoing_window: begin.outgoing_window(),
298 flags: if local { Flags::LOCAL } else { Flags::empty() },
299 next_outgoing_id: INITIAL_NEXT_OUTGOING_ID,
300 unsettled_snd_deliveries: HashMap::default(),
301 unsettled_rcv_deliveries: HashMap::default(),
302 links: Slab::new(),
303 links_by_name: HashMap::default(),
304 remote_handles: HashMap::default(),
305 pending_transfers: VecDeque::new(),
306 error: None,
307 pool_notify: pool::new(),
308 pool_credit: pool::new(),
309 closed: condition::Condition::new(),
310 id,
311 sink,
312 begin,
313 remote_channel_id,
314 }
315 }
316
317 pub(crate) fn id(&self) -> u16 {
319 self.id as u16
320 }
321
322 pub(crate) fn tag(&self) -> &'static str {
323 self.sink.tag()
324 }
325
326 pub(crate) fn unsettled_deliveries(
327 &mut self,
328 sender: bool,
329 ) -> &mut HashMap<DeliveryNumber, DeliveryInner> {
330 if sender {
331 &mut self.unsettled_snd_deliveries
332 } else {
333 &mut self.unsettled_rcv_deliveries
334 }
335 }
336
337 pub(crate) fn set_error(&mut self, err: AmqpProtocolError) {
339 log::trace!(
340 "{}: Connection is failed, dropping state: {err:?}",
341 self.tag()
342 );
343
344 for tr in self.pending_transfers.drain(..) {
346 let _ = tr.tx.send(Err(err.clone()));
347 }
348
349 for (_, mut promise) in self.unsettled_snd_deliveries.drain() {
351 promise.set_error(err.clone());
352 }
353 for (_, mut promise) in self.unsettled_rcv_deliveries.drain() {
354 promise.set_error(err.clone());
355 }
356
357 self.links_by_name.clear();
359 for (_, st) in &mut self.links {
360 match st {
361 Either::Left(SenderLinkState::Established(link)) => {
362 link.inner.get_mut().remote_detached(err.clone());
363 }
364 Either::Left(SenderLinkState::Closing(link)) => {
365 if let Some(tx) = link.take() {
366 let _ = tx.send(Err(err.clone()));
367 }
368 }
369 Either::Right(ReceiverLinkState::Established(link)) => {
370 link.remote_detached(None);
371 }
372 _ => (),
373 }
374 }
375 self.links.clear();
376
377 self.error = Some(err);
378 self.flags.insert(Flags::ENDED);
379 self.closed.notify_and_lock_readiness();
380 }
381
382 pub(crate) fn end(&mut self, err: AmqpProtocolError) -> Action {
384 log::trace!("{}: Session is ended: {err:?}", self.tag());
385
386 let links = self.get_all_links();
387 self.set_error(err);
388
389 Action::SessionEnded(links)
390 }
391
392 fn get_all_links(&self) -> Vec<Either<SenderLink, ReceiverLink>> {
393 self.links
394 .iter()
395 .filter_map(|(_, st)| match st {
396 Either::Left(SenderLinkState::Established(link)) => {
397 Some(Either::Left((*link).clone()))
398 }
399 Either::Right(ReceiverLinkState::Established(link)) => {
400 Some(Either::Right((*link).clone()))
401 }
402 _ => None,
403 })
404 .collect()
405 }
406
407 pub(crate) fn max_frame_size(&self) -> u32 {
408 self.sink.0.max_frame_size
409 }
410
411 pub(crate) fn new_remote_sender(&mut self, attach: &Attach) -> (usize, Attach) {
413 let id = self
414 .links
415 .insert(Either::Left(SenderLinkState::OpeningRemote));
416
417 let attach = Attach(Box::new(codec::AttachInner {
418 name: attach.0.name.clone(),
419 handle: id as Handle,
420 role: Role::Sender,
421 snd_settle_mode: attach.snd_settle_mode(),
422 rcv_settle_mode: attach.rcv_settle_mode(),
423 source: attach.0.source.clone(),
424 target: attach.0.target.clone(),
425 unsettled: None,
426 incomplete_unsettled: false,
427 initial_delivery_count: Some(attach.initial_delivery_count().unwrap_or(0)),
428 max_message_size: None,
429 offered_capabilities: None,
430 desired_capabilities: None,
431 properties: None,
432 }));
433
434 (id, attach)
435 }
436
437 pub(crate) fn attach_local_sender_link(
439 &mut self,
440 mut frame: Attach,
441 ) -> oneshot::Receiver<Result<Cell<SenderLinkInner>, AmqpProtocolError>> {
442 let (tx, rx) = oneshot::channel();
443
444 let entry = self.links.vacant_entry();
445 let token = entry.key();
446 entry.insert(Either::Left(SenderLinkState::Opening(Some(tx))));
447 log::trace!(
448 "{}: Local sender link opening: {:?} hnd:{token:?}",
449 self.tag(),
450 frame.name(),
451 );
452
453 frame.0.handle = token as Handle;
454
455 self.links_by_name.insert(frame.0.name.clone(), token);
456 self.post_frame(Frame::Attach(frame));
457 rx
458 }
459
460 pub(crate) fn attach_remote_sender_link(
462 &mut self,
463 attach: &Attach,
464 mut response: Attach,
465 link: Cell<SenderLinkInner>,
466 ) -> SenderLink {
467 log::trace!(
468 "{}: Remote sender link attached: {:?}",
469 self.tag(),
470 attach.name()
471 );
472 let token = link.id;
473
474 if let Some(source) = attach.source()
475 && let Some(ref addr) = source.address
476 {
477 self.links_by_name.insert(addr.clone(), token);
478 }
479
480 self.remote_handles.insert(attach.handle(), token);
481 *self
482 .links
483 .get_mut(token)
484 .expect("new remote sender entry must exist") = Either::Left(
485 SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
486 );
487
488 *response.handle_mut() = token as Handle;
489 *response.max_message_size_mut() = link.max_message_size().map(u64::from);
490
491 self.post_frame(response.into());
492 SenderLink::new(link)
493 }
494
495 pub(crate) fn detach_sender_link(
497 &mut self,
498 id: Handle,
499 closed: bool,
500 error: Option<Error>,
501 tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
502 ) {
503 if let Some(Either::Left(link)) = self.links.get_mut(id as usize) {
504 match link {
505 SenderLinkState::Opening(_) => {
506 let detach = Detach(Box::new(codec::DetachInner {
507 handle: id,
508 closed,
509 error,
510 }));
511 *link = SenderLinkState::Closing(Some(tx));
512 self.post_frame(detach.into());
513 }
514 SenderLinkState::Established(sender_link) => {
515 let sender_link = sender_link.clone();
516 let detach = Detach(Box::new(codec::DetachInner {
517 handle: id,
518 closed,
519 error,
520 }));
521
522 *link = SenderLinkState::Closing(Some(tx));
523 self.post_frame(detach.clone().into());
524 self.sink
525 .get_control_queue()
526 .enqueue_frame(ControlFrame::new_kind(
527 crate::ControlFrameKind::LocalDetachSender(detach, sender_link),
528 ));
529 }
530 SenderLinkState::OpeningRemote => {
531 let _ = tx.send(Ok(()));
532 log::error!(
533 "{}: Unexpected sender link state: opening remote - {id}",
534 self.tag()
535 );
536 }
537 SenderLinkState::Closing(_) => {
538 let _ = tx.send(Ok(()));
539 log::error!(
540 "{}: Unexpected sender link state: closing - {id}",
541 self.tag()
542 );
543 }
544 }
545 } else {
546 let _ = tx.send(Ok(()));
547 log::debug!(
548 "{}: Sender link does not exist while detaching: {id}",
549 self.tag()
550 );
551 }
552 }
553
554 pub(crate) fn detach_unconfirmed_sender_link(
556 &mut self,
557 attach: &Attach,
558 link: &Cell<SenderLinkInner>,
559 error: Option<Error>,
560 ) {
561 let token = link.id;
562
563 let attach = Attach(Box::new(codec::AttachInner {
564 name: attach.0.name.clone(),
565 handle: token as Handle,
566 role: attach.0.role,
567 snd_settle_mode: SenderSettleMode::Unsettled,
568 rcv_settle_mode: ReceiverSettleMode::First,
569 source: None,
570 target: None,
571 unsettled: None,
572 incomplete_unsettled: false,
573 initial_delivery_count: None,
574 max_message_size: None,
575 offered_capabilities: None,
576 desired_capabilities: None,
577 properties: None,
578 }));
579 self.post_frame(attach.into());
580
581 let detach = Detach(Box::new(codec::DetachInner {
582 handle: token as Handle,
583 closed: true,
584 error,
585 }));
586 self.post_frame(detach.into());
587
588 if self.links.contains(token) {
589 self.links.remove(token);
590 }
591 }
592
593 pub(crate) fn get_sender_link_by_local_handle(&self, hnd: Handle) -> Option<&SenderLink> {
594 if let Some(Either::Left(SenderLinkState::Established(link))) = self.links.get(hnd as usize)
595 {
596 Some(link)
597 } else {
598 None
599 }
600 }
601
602 pub(crate) fn get_sender_link_by_remote_handle(&self, hnd: Handle) -> Option<&SenderLink> {
603 if let Some(id) = self.remote_handles.get(&hnd)
604 && let Some(Either::Left(SenderLinkState::Established(link))) = self.links.get(*id)
605 {
606 return Some(link);
607 }
608 None
609 }
610
611 pub(crate) fn attach_remote_receiver_link(
613 &mut self,
614 cell: Cell<SessionInner>,
615 attach: &Attach,
616 ) -> (Attach, ReceiverLink) {
617 let handle = attach.handle();
618 let entry = self.links.vacant_entry();
619 let token = entry.key();
620
621 let inner = Cell::new(ReceiverLinkInner::new(cell, token as u32, handle, attach));
622 entry.insert(Either::Right(ReceiverLinkState::Opening(Box::new(Some((
623 inner.clone(),
624 attach.source().cloned(),
625 ))))));
626 self.remote_handles.insert(handle, token);
627
628 let response = Attach(Box::new(codec::AttachInner {
629 name: attach.0.name.clone(),
630 handle: token as Handle,
631 role: Role::Receiver,
632 snd_settle_mode: attach.snd_settle_mode(),
633 rcv_settle_mode: ReceiverSettleMode::First,
634 source: attach.0.source.clone(),
635 target: attach.0.target.clone(),
636 unsettled: None,
637 incomplete_unsettled: false,
638 initial_delivery_count: Some(0),
639 offered_capabilities: None,
640 desired_capabilities: None,
641 max_message_size: None,
642 properties: None,
643 }));
644
645 (response, ReceiverLink::new(inner))
646 }
647
648 pub(crate) fn attach_local_receiver_link(
649 &mut self,
650 cell: Cell<SessionInner>,
651 mut frame: Attach,
652 ) -> oneshot::Receiver<Result<ReceiverLink, AmqpProtocolError>> {
653 let (tx, rx) = oneshot::channel();
654
655 let entry = self.links.vacant_entry();
656 let token = entry.key();
657
658 let inner = Cell::new(ReceiverLinkInner::new(
659 cell,
660 token as u32,
661 token as u32,
662 &frame,
663 ));
664 entry.insert(Either::Right(ReceiverLinkState::OpeningLocal(Some((
665 inner, tx,
666 )))));
667
668 frame.0.handle = token as Handle;
669
670 self.links_by_name.insert(frame.0.name.clone(), token);
671 self.post_frame(Frame::Attach(frame));
672 rx
673 }
674
675 pub(crate) fn confirm_receiver_link(
676 &mut self,
677 token: Handle,
678 mut response: Attach,
679 max_message_size: Option<u64>,
680 ) {
681 if let Some(Either::Right(link)) = self.links.get_mut(token as usize)
682 && let ReceiverLinkState::Opening(l) = link
683 && let Some((l, _)) = l.take()
684 {
685 *response.max_message_size_mut() = max_message_size;
686 *link = ReceiverLinkState::Established(EstablishedReceiverLink::new(l));
687 self.post_frame(response.into());
688 return;
689 }
690 log::error!("{}: Unexpected receiver link state", self.tag());
692 }
693
694 pub(crate) fn detach_receiver_link(
696 &mut self,
697 id: Handle,
698 closed: bool,
699 error: Option<Error>,
700 tx: oneshot::Sender<Result<(), AmqpProtocolError>>,
701 ) {
702 if let Some(Either::Right(link)) = self.links.get_mut(id as usize) {
703 match link {
704 ReceiverLinkState::Opening(inner) => {
705 if let Some((inner, source)) = inner.take() {
706 let attach = Attach(Box::new(codec::AttachInner {
707 source,
708 max_message_size: None,
709 name: inner.name().clone(),
710 handle: id,
711 role: Role::Receiver,
712 snd_settle_mode: SenderSettleMode::Mixed,
713 rcv_settle_mode: ReceiverSettleMode::First,
714 target: None,
715 unsettled: None,
716 incomplete_unsettled: false,
717 initial_delivery_count: Some(0),
718 offered_capabilities: None,
719 desired_capabilities: None,
720 properties: None,
721 }));
722 self.post_frame(attach.into());
723 }
724 let detach = Detach(Box::new(codec::DetachInner {
725 closed,
726 error,
727 handle: id,
728 }));
729 self.post_frame(detach.into());
730 let _ = tx.send(Ok(()));
731 if self.links.contains(id as usize) {
732 let _ = self.links.remove(id as usize);
733 }
734 }
735 ReceiverLinkState::Established(receiver_link) => {
736 let receiver_link = receiver_link.clone();
737 let detach = Detach(Box::new(codec::DetachInner {
738 handle: id,
739 closed,
740 error,
741 }));
742 *link = ReceiverLinkState::Closing(Some(tx));
743 self.post_frame(detach.clone().into());
744 self.sink
745 .get_control_queue()
746 .enqueue_frame(ControlFrame::new_kind(
747 crate::ControlFrameKind::LocalDetachReceiver(detach, receiver_link),
748 ));
749 }
750 ReceiverLinkState::Closing(_) => {
751 let _ = tx.send(Ok(()));
752 if self.links.contains(id as usize) {
753 let _ = self.links.remove(id as usize);
754 }
755 log::error!(
756 "{}: Unexpected receiver link state: closing - {id}",
757 self.tag()
758 );
759 }
760 ReceiverLinkState::OpeningLocal(_inner) => unimplemented!(),
761 }
762 } else {
763 let _ = tx.send(Ok(()));
764 log::error!(
765 "{}: Receiver link does not exist while detaching: {id}",
766 self.tag()
767 );
768 }
769 }
770
771 pub(crate) fn get_receiver_link_by_local_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
772 if let Some(Either::Right(ReceiverLinkState::Established(link))) =
773 self.links.get(hnd as usize)
774 {
775 Some(link)
776 } else {
777 None
778 }
779 }
780
781 pub(crate) fn get_receiver_link_by_remote_handle(&self, hnd: Handle) -> Option<&ReceiverLink> {
782 if let Some(id) = self.remote_handles.get(&hnd)
783 && let Some(Either::Right(ReceiverLinkState::Established(link))) = self.links.get(*id)
784 {
785 return Some(link);
786 }
787 None
788 }
789
790 pub(crate) fn handle_frame(&mut self, frame: Frame) -> Result<Action, AmqpProtocolError> {
791 if self.error.is_none() {
792 match frame {
793 Frame::Flow(flow) => {
794 if let Some(Either::Left(link)) = flow
796 .handle()
797 .and_then(|h| self.remote_handles.get(&h).copied())
798 .and_then(|h| self.links.get_mut(h))
799 {
800 if let SenderLinkState::Established(link) = link {
801 return Ok(Action::Flow((*link).clone(), flow));
802 }
803 log::warn!("{}: Received flow frame", self.tag());
804 }
805 self.handle_flow(&flow, None);
806 Ok(Action::None)
807 }
808 Frame::Disposition(disp) => {
809 self.settle_deliveries(&disp);
810 Ok(Action::None)
811 }
812 Frame::Transfer(transfer) => {
813 let idx = if let Some(idx) = self.remote_handles.get(&transfer.handle()) {
814 *idx
815 } else {
816 log::debug!(
817 "{}: Transfer's link {:?} is unknown",
818 self.tag(),
819 transfer.handle()
820 );
821 return Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)));
822 };
823
824 if let Some(link) = self.links.get_mut(idx) {
825 match link {
826 Either::Left(_) => {
827 log::debug!(
828 "{}: Got unexpected trasfer from sender link",
829 self.tag()
830 );
831 Err(AmqpProtocolError::Unexpected(Frame::Transfer(transfer)))
832 }
833 Either::Right(link) => match link {
834 ReceiverLinkState::Opening(_)
835 | ReceiverLinkState::OpeningLocal(_) => {
836 log::debug!(
837 "{}: Got transfer for opening link: {} -> {idx}",
838 self.tag(),
839 transfer.handle()
840 );
841 Err(AmqpProtocolError::UnexpectedOpeningState(Frame::Transfer(
842 transfer,
843 )))
844 }
845 ReceiverLinkState::Established(link) => {
846 let _ = self.next_incoming_id.wrapping_add(1);
848 Ok(link.inner.get_mut().handle_transfer(transfer, &link.inner))
849 }
850 ReceiverLinkState::Closing(_) => Ok(Action::None),
851 },
852 }
853 } else {
854 Err(AmqpProtocolError::UnknownLink(Frame::Transfer(transfer)))
855 }
856 }
857 Frame::Detach(detach) => Ok(self.handle_detach(detach)),
858 frame => {
859 log::debug!("{}: Unexpected frame: {frame:?}", self.tag());
860 Ok(Action::None)
861 }
862 }
863 } else {
864 Ok(Action::None)
865 }
866 }
867
868 pub(crate) fn handle_attach(&mut self, attach: &Attach, cell: Cell<SessionInner>) -> bool {
870 let name = attach.name();
871
872 if let Some(index) = self.links_by_name.get(name) {
873 match self.links.get_mut(*index) {
874 Some(Either::Left(item)) => {
875 if item.is_opening() {
876 log::trace!(
877 "{}: Local sender link attached: {name:?} {index} -> {}, {:?}",
878 self.sink.tag(),
879 attach.handle(),
880 self.remote_handles.contains_key(&attach.handle())
881 );
882
883 self.remote_handles.insert(attach.handle(), *index);
884 let delivery_count = attach.initial_delivery_count().unwrap_or(0);
885 let link = Cell::new(SenderLinkInner::new(
886 *index,
887 name.clone(),
888 attach.handle(),
889 delivery_count,
890 cell,
891 attach
892 .max_message_size()
893 .map(|v| u32::try_from(v).unwrap_or(u32::MAX)),
894 ));
895 let local_sender = mem::replace(
896 item,
897 SenderLinkState::Established(EstablishedSenderLink::new(link.clone())),
898 );
899
900 if let SenderLinkState::Opening(Some(tx)) = local_sender {
901 let _ = tx.send(Ok(link));
902 }
903 }
904 }
905 Some(Either::Right(item)) => {
906 if item.is_opening() {
907 log::trace!(
908 "{}: Local receiver link attached: {name:?} {index} -> {}",
909 self.sink.tag(),
910 attach.handle()
911 );
912 if let ReceiverLinkState::OpeningLocal(opt_item) = item {
913 if let Some((link, tx)) = opt_item.take() {
914 self.remote_handles.insert(attach.handle(), *index);
915
916 *item = ReceiverLinkState::Established(
917 EstablishedReceiverLink::new(link.clone()),
918 );
919 let _ = tx.send(Ok(ReceiverLink::new(link)));
920 } else {
921 log::error!("{}: Inconsistent session state, bug", self.tag());
923 }
924 }
925 }
926 }
927 _ => {
928 log::trace!("Protocol error");
930 }
931 }
932 true
933 } else {
934 false
936 }
937 }
938
939 #[allow(clippy::too_many_lines)]
940 pub(crate) fn handle_detach(&mut self, mut frame: Detach) -> Action {
942 let idx = if let Some(idx) = self.remote_handles.get(&frame.handle()) {
944 *idx
945 } else if self.links.contains(frame.handle() as usize) {
946 frame.handle() as usize
947 } else {
948 log::info!("{}: Detaching unknown link: {frame:?}", self.tag());
950 return Action::None;
951 };
952
953 let handle = frame.handle();
954 let mut action = Action::None;
955
956 let remove = if let Some(link) = self.links.get_mut(idx) {
957 match link {
958 Either::Left(link) => match link {
959 SenderLinkState::Opening(tx) => {
960 if let Some(tx) = tx.take() {
961 let err = AmqpProtocolError::LinkDetached(frame.0.error.clone());
962 let _ = tx.send(Err(err));
963 }
964 true
965 }
966 SenderLinkState::Established(link) => {
967 let detach = Detach(Box::new(codec::DetachInner {
969 handle: link.inner.get_ref().id(),
970 closed: true,
971 error: frame.error().cloned(),
972 }));
973 let err = AmqpProtocolError::LinkDetached(detach.0.error.clone());
974
975 self.links_by_name.remove(link.inner.name());
977
978 let mut idx = 0;
980 let handle = link.inner.get_ref().remote_handle();
981 while idx < self.pending_transfers.len() {
982 if self.pending_transfers[idx].link_handle == handle {
983 let tr = self.pending_transfers.remove(idx).unwrap();
984 let _ = tr.tx.send(Err(err.clone()));
985 } else {
986 idx += 1;
987 }
988 }
989
990 let handle = link.inner.get_ref().id() as Handle;
992 for delivery in self.unsettled_snd_deliveries.values_mut() {
993 if delivery.handle() == handle {
994 delivery.set_error(err.clone());
995 }
996 }
997
998 link.inner.get_mut().remote_detached(err);
1000 self.sink
1001 .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1002 action = Action::DetachSender(link.clone(), frame);
1003 true
1004 }
1005 SenderLinkState::OpeningRemote => {
1006 log::warn!(
1007 "{}: Detach frame received for unconfirmed sender link: {frame:?}",
1008 self.tag()
1009 );
1010 true
1011 }
1012 SenderLinkState::Closing(tx) => {
1013 if let Some(tx) = tx.take() {
1014 if let Some(err) = frame.0.error {
1015 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1016 } else {
1017 let _ = tx.send(Ok(()));
1018 }
1019 }
1020 true
1021 }
1022 },
1023 Either::Right(link) => match link {
1024 ReceiverLinkState::Opening(_) => false,
1025 ReceiverLinkState::OpeningLocal(item) => {
1026 if let Some((inner, tx)) = item.take() {
1027 inner.get_mut().detached();
1028 if let Some(err) = frame.0.error.clone() {
1029 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1030 } else {
1031 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(None)));
1032 }
1033 } else {
1034 log::error!("{}: Inconsistent session state, bug", self.tag());
1035 }
1036
1037 true
1038 }
1039 ReceiverLinkState::Established(link) => {
1040 let error = frame.0.error.take();
1041
1042 let err = AmqpProtocolError::LinkDetached(error.clone());
1044 let handle = link.inner.get_ref().id();
1045 for delivery in self.unsettled_rcv_deliveries.values_mut() {
1046 if delivery.handle() == handle {
1047 delivery.set_error(err.clone());
1048 }
1049 }
1050
1051 let detach = Detach(Box::new(codec::DetachInner {
1053 handle: link.handle(),
1054 closed: true,
1055 error: None,
1056 }));
1057 self.sink
1058 .post_frame(AmqpFrame::new(self.id as u16, detach.into()));
1059
1060 link.remote_detached(error);
1062 action = Action::DetachReceiver(link.clone(), frame);
1063 true
1064 }
1065 ReceiverLinkState::Closing(tx) => {
1066 if let Some(tx) = tx.take() {
1068 if let Some(err) = frame.0.error {
1069 let _ = tx.send(Err(AmqpProtocolError::LinkDetached(Some(err))));
1070 } else {
1071 let _ = tx.send(Ok(()));
1072 }
1073 }
1074 true
1075 }
1076 },
1077 }
1078 } else {
1079 false
1080 };
1081
1082 if remove {
1083 if self.links.contains(idx) {
1084 self.links.remove(idx);
1085 }
1086 self.remote_handles.remove(&handle);
1087 }
1088 action
1089 }
1090
1091 fn settle_deliveries(&mut self, disp: &Disposition) {
1092 let from = disp.first();
1093 let to = disp.last();
1094
1095 if cfg!(feature = "frame-trace") {
1096 log::trace!("{}: Settle delivery: {disp:#?}", self.tag());
1097 } else {
1098 log::trace!(
1099 "{}: Settle delivery from {from} - {to:?}, state {:?} settled: {:?}",
1100 self.tag(),
1101 disp.state(),
1102 disp.settled()
1103 );
1104 }
1105
1106 let deliveries = if disp.role() == Role::Receiver {
1107 &mut self.unsettled_snd_deliveries
1108 } else {
1109 &mut self.unsettled_rcv_deliveries
1110 };
1111
1112 if let Some(to) = to {
1113 for no in from..=to {
1114 if let Some(delivery) = deliveries.get_mut(&no) {
1115 delivery.handle_disposition(disp);
1116 } else {
1117 log::trace!(
1118 "{}: Unknown deliveryid: {no:?} disp: {disp:?}",
1119 self.sink.tag()
1120 );
1121 }
1122 }
1123 } else if let Some(delivery) = deliveries.get_mut(&from) {
1124 delivery.handle_disposition(disp);
1125 }
1126 }
1127
1128 pub(crate) fn handle_flow(&mut self, flow: &Flow, link: Option<&SenderLink>) {
1129 self.next_incoming_id = flow.next_outgoing_id();
1131 self.remote_outgoing_window = flow.outgoing_window();
1132
1133 self.remote_incoming_window = flow
1134 .next_incoming_id()
1135 .unwrap_or(0)
1136 .wrapping_add(flow.incoming_window())
1137 .wrapping_sub(self.next_outgoing_id);
1138
1139 log::trace!(
1140 "{}: Session received credit {:?}. window: {}, pending: {}",
1141 self.tag(),
1142 flow.link_credit(),
1143 self.remote_incoming_window,
1144 self.pending_transfers.len(),
1145 );
1146
1147 while let Some(tr) = self.pending_transfers.pop_front() {
1148 let _ = tr.tx.send(Ok(()));
1149 }
1150
1151 if flow.echo() {
1152 let flow = Flow(Box::new(codec::FlowInner {
1153 next_incoming_id: if self.flags.contains(Flags::LOCAL) {
1154 Some(self.next_incoming_id)
1155 } else {
1156 None
1157 },
1158 incoming_window: u32::MAX,
1159 next_outgoing_id: self.next_outgoing_id,
1160 outgoing_window: self.remote_incoming_window,
1161 handle: None,
1162 delivery_count: None,
1163 link_credit: None,
1164 available: None,
1165 drain: false,
1166 echo: false,
1167 properties: None,
1168 }));
1169 self.post_frame(flow.into());
1170 }
1171
1172 if let Some(link) = link {
1174 link.inner.get_mut().apply_flow(flow);
1175 }
1176 }
1177
1178 pub(crate) fn rcv_link_flow(&mut self, handle: u32, delivery_count: u32, credit: u32) {
1179 let flow = Flow(Box::new(codec::FlowInner {
1180 next_incoming_id: Some(self.next_incoming_id),
1181 incoming_window: u32::MAX,
1182 next_outgoing_id: self.next_outgoing_id,
1183 outgoing_window: self.remote_incoming_window,
1184 handle: Some(handle),
1185 delivery_count: Some(delivery_count),
1186 link_credit: Some(credit),
1187 available: None,
1188 drain: false,
1189 echo: false,
1190 properties: None,
1191 }));
1192 self.post_frame(flow.into());
1193 }
1194
1195 #[allow(clippy::too_many_lines)]
1196 pub(crate) async fn send_transfer(
1197 &mut self,
1198 link_handle: Handle,
1199 tag: Bytes,
1200 body: TransferBody,
1201 settled: bool,
1202 format: Option<MessageFormat>,
1203 ) -> Result<DeliveryNumber, AmqpProtocolError> {
1204 loop {
1205 if self.remote_incoming_window == 0 {
1206 log::trace!(
1207 "{}: Remote window is 0, push to pending queue, hnd:{link_handle:?}",
1208 self.sink.tag()
1209 );
1210 let (tx, rx) = self.pool_credit.channel();
1211 self.pending_transfers
1212 .push_back(PendingTransfer { tx, link_handle });
1213
1214 rx.await
1215 .map_err(|_| AmqpProtocolError::ConnectionDropped)
1216 .and_then(|v| v)?;
1217 continue;
1218 }
1219 break;
1220 }
1221
1222 self.remote_incoming_window -= 1;
1223
1224 let delivery_id = self.next_outgoing_id;
1225 self.next_outgoing_id = self.next_outgoing_id.wrapping_add(1);
1226
1227 let tr_settled = if settled {
1228 Some(DeliveryState::Accepted(Accepted {}))
1229 } else {
1230 None
1231 };
1232 let message_format = if format.is_none() {
1233 body.message_format()
1234 } else {
1235 format
1236 };
1237
1238 let max_frame_size = self.max_frame_size();
1239 let max_frame_size = if max_frame_size > 2048 {
1240 max_frame_size - 2048
1241 } else if max_frame_size == 0 {
1242 u32::MAX
1243 } else {
1244 max_frame_size
1245 } as usize;
1246
1247 if body.len() > max_frame_size {
1249 let mut body = match body {
1250 TransferBody::Data(data) => data,
1251 TransferBody::Message(msg) => {
1252 let mut buf = self
1253 .sink
1254 .config()
1255 .write_buf()
1256 .buf_with_capacity(msg.encoded_size());
1257 msg.encode(&mut buf);
1258 buf.freeze()
1259 }
1260 };
1261
1262 let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1263
1264 let mut transfer = Transfer(Box::default());
1265 transfer.0.handle = link_handle;
1266 transfer.0.body = Some(TransferBody::Data(chunk));
1267 transfer.0.more = true;
1268 transfer.0.state = tr_settled;
1269 transfer.0.batchable = true;
1270 transfer.0.delivery_id = Some(delivery_id);
1271 transfer.0.delivery_tag = Some(tag.clone());
1272 transfer.0.message_format = message_format;
1273
1274 if settled {
1275 transfer.0.settled = Some(true);
1276 } else {
1277 self.unsettled_snd_deliveries
1278 .insert(delivery_id, DeliveryInner::new(link_handle));
1279 }
1280
1281 log::trace!(
1282 "{}: Sending transfer over handle {link_handle}. window: {} delivery_id: {:?} delivery_tag: {:?}, more: {:?}, batchable: {:?}, settled: {:?}",
1283 self.sink.tag(),
1284 self.remote_incoming_window,
1285 transfer.delivery_id(),
1286 transfer.delivery_tag(),
1287 transfer.more(),
1288 transfer.batchable(),
1289 transfer.settled(),
1290 );
1291 self.post_frame(Frame::Transfer(transfer));
1292
1293 loop {
1294 if body.is_empty() {
1296 log::trace!("{}: Last tranfer for {tag:?} is sent", self.tag());
1297 break;
1298 }
1299
1300 let chunk = body.split_to(cmp::min(max_frame_size, body.len()));
1301
1302 log::trace!("{}: Sending chunk tranfer for {tag:?}", self.tag());
1303 let mut transfer = Transfer(Box::default());
1304 transfer.0.handle = link_handle;
1305 transfer.0.body = Some(TransferBody::Data(chunk));
1306 transfer.0.more = !body.is_empty();
1307 transfer.0.batchable = true;
1308 transfer.0.message_format = message_format;
1309 self.post_frame(Frame::Transfer(transfer));
1310 }
1311 } else {
1312 let mut transfer = Transfer(Box::default());
1313 transfer.0.handle = link_handle;
1314 transfer.0.body = Some(body);
1315 transfer.0.state = tr_settled;
1316 transfer.0.delivery_id = Some(delivery_id);
1317 transfer.0.delivery_tag = Some(tag);
1318 transfer.0.message_format = message_format;
1319
1320 if settled {
1321 transfer.0.settled = Some(true);
1322 } else {
1323 self.unsettled_snd_deliveries
1324 .insert(delivery_id, DeliveryInner::new(link_handle));
1325 }
1326 self.post_frame(Frame::Transfer(transfer));
1327 }
1328
1329 Ok(delivery_id)
1330 }
1331
1332 pub(crate) fn post_frame(&mut self, frame: Frame) {
1333 self.sink.post_frame(AmqpFrame::new(self.id(), frame));
1334 }
1335}