1use std::collections::VecDeque;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::u32;
6
7use actix_utils::oneshot;
8use actix_utils::task::LocalWaker;
9use amqp_codec::protocol::{
10 Attach, DeliveryNumber, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role,
11 SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer,
12};
13use bytes::Bytes;
14use bytestring::ByteString;
15use futures::Stream;
16
17use crate::cell::Cell;
18use crate::errors::AmqpTransportError;
19use crate::session::{Session, SessionInner};
20use crate::Configuration;
21
22#[derive(Clone, Debug)]
23pub struct ReceiverLink {
24 pub(crate) inner: Cell<ReceiverLinkInner>,
25}
26
27impl ReceiverLink {
28 pub(crate) fn new(inner: Cell<ReceiverLinkInner>) -> ReceiverLink {
29 ReceiverLink { inner }
30 }
31
32 pub fn handle(&self) -> Handle {
33 self.inner.get_ref().handle as Handle
34 }
35
36 pub fn credit(&self) -> u32 {
37 self.inner.get_ref().credit
38 }
39
40 pub fn session(&self) -> &Session {
41 &self.inner.get_ref().session
42 }
43
44 pub fn session_mut(&mut self) -> &mut Session {
45 &mut self.inner.get_mut().session
46 }
47
48 pub fn frame(&self) -> &Attach {
49 &self.inner.get_ref().attach
50 }
51
52 pub fn open(&mut self) {
53 let inner = self.inner.get_mut();
54 inner
55 .session
56 .inner
57 .get_mut()
58 .confirm_receiver_link(inner.handle, &inner.attach);
59 }
60
61 pub fn set_link_credit(&mut self, credit: u32) {
62 self.inner.get_mut().set_link_credit(credit);
63 }
64
65 pub fn send_disposition(&mut self, disp: Disposition) {
67 self.inner
68 .get_mut()
69 .session
70 .inner
71 .get_mut()
72 .post_frame(disp.into());
73 }
74
75 pub fn wait_disposition(
77 &mut self,
78 id: DeliveryNumber,
79 ) -> impl Future<Output = Result<Disposition, AmqpTransportError>> {
80 self.inner.get_mut().session.wait_disposition(id)
81 }
82
83 pub fn close(&self) -> impl Future<Output = Result<(), AmqpTransportError>> {
84 self.inner.get_mut().close(None)
85 }
86
87 pub fn close_with_error(
88 &self,
89 error: Error,
90 ) -> impl Future<Output = Result<(), AmqpTransportError>> {
91 self.inner.get_mut().close(Some(error))
92 }
93
94 #[inline]
95 pub fn remote_config(&self) -> &Configuration {
97 &self.inner.session.remote_config()
98 }
99}
100
101impl Stream for ReceiverLink {
102 type Item = Result<Transfer, AmqpTransportError>;
103
104 fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
105 let inner = self.inner.get_mut();
106
107 if let Some(tr) = inner.queue.pop_front() {
108 Poll::Ready(Some(Ok(tr)))
109 } else {
110 if inner.closed {
111 Poll::Ready(None)
112 } else {
113 inner.reader_task.register(cx.waker());
114 Poll::Pending
115 }
116 }
117 }
118}
119
120#[derive(Debug)]
121pub(crate) struct ReceiverLinkInner {
122 handle: Handle,
123 attach: Attach,
124 session: Session,
125 closed: bool,
126 reader_task: LocalWaker,
127 queue: VecDeque<Transfer>,
128 credit: u32,
129 delivery_count: u32,
130}
131
132impl ReceiverLinkInner {
133 pub(crate) fn new(
134 session: Cell<SessionInner>,
135 handle: Handle,
136 attach: Attach,
137 ) -> ReceiverLinkInner {
138 ReceiverLinkInner {
139 handle,
140 session: Session::new(session),
141 closed: false,
142 reader_task: LocalWaker::new(),
143 queue: VecDeque::with_capacity(4),
144 credit: 0,
145 delivery_count: attach.initial_delivery_count().unwrap_or(0),
146 attach,
147 }
148 }
149
150 pub fn name(&self) -> &ByteString {
151 &self.attach.name
152 }
153
154 pub fn close(
155 &mut self,
156 error: Option<Error>,
157 ) -> impl Future<Output = Result<(), AmqpTransportError>> {
158 let (tx, rx) = oneshot::channel();
159 if self.closed {
160 let _ = tx.send(Ok(()));
161 } else {
162 self.session
163 .inner
164 .get_mut()
165 .detach_receiver_link(self.handle, true, error, tx);
166 }
167 async move {
168 match rx.await {
169 Ok(Ok(_)) => Ok(()),
170 Ok(Err(e)) => Err(e),
171 Err(_) => Err(AmqpTransportError::Disconnected),
172 }
173 }
174 }
175
176 pub fn set_link_credit(&mut self, credit: u32) {
177 self.credit += credit;
178 self.session
179 .inner
180 .get_mut()
181 .rcv_link_flow(self.handle as u32, self.delivery_count, credit);
182 }
183
184 pub fn handle_transfer(&mut self, transfer: Transfer) {
185 if self.credit == 0 {
186 let err = Error {
188 condition: LinkError::TransferLimitExceeded.into(),
189 description: None,
190 info: None,
191 };
192 let _ = self.close(Some(err));
193 } else {
194 self.credit -= 1;
195 self.delivery_count += 1;
196 self.queue.push_back(transfer);
197 if self.queue.len() == 1 {
198 self.reader_task.wake()
199 }
200 }
201 }
202}
203
204pub struct ReceiverLinkBuilder {
205 frame: Attach,
206 session: Cell<SessionInner>,
207}
208
209impl ReceiverLinkBuilder {
210 pub(crate) fn new(name: ByteString, address: ByteString, session: Cell<SessionInner>) -> Self {
211 let source = Source {
212 address: Some(address),
213 durable: TerminusDurability::None,
214 expiry_policy: TerminusExpiryPolicy::SessionEnd,
215 timeout: 0,
216 dynamic: false,
217 dynamic_node_properties: None,
218 distribution_mode: None,
219 filter: None,
220 default_outcome: None,
221 outcomes: None,
222 capabilities: None,
223 };
224 let frame = Attach {
225 name,
226 handle: 0 as Handle,
227 role: Role::Receiver,
228 snd_settle_mode: SenderSettleMode::Mixed,
229 rcv_settle_mode: ReceiverSettleMode::First,
230 source: Some(source),
231 target: None,
232 unsettled: None,
233 incomplete_unsettled: false,
234 initial_delivery_count: None,
235 max_message_size: Some(65536 * 4),
236 offered_capabilities: None,
237 desired_capabilities: None,
238 properties: None,
239 };
240
241 ReceiverLinkBuilder { frame, session }
242 }
243
244 pub fn max_message_size(mut self, size: u64) -> Self {
245 self.frame.max_message_size = Some(size);
246 self
247 }
248
249 pub async fn open(self) -> Result<ReceiverLink, AmqpTransportError> {
250 let cell = self.session.clone();
251 let res = self
252 .session
253 .get_mut()
254 .open_local_receiver_link(cell, self.frame)
255 .await;
256
257 match res {
258 Ok(Ok(res)) => Ok(res),
259 Ok(Err(err)) => Err(err),
260 Err(_) => Err(AmqpTransportError::Disconnected),
261 }
262 }
263}