actix_amqp/
rcvlink.rs

1use std::collections::VecDeque;
2use std::future::Future;
3use std::pin::Pin;
4use std::task::{Context, Poll};
5use std::u32;
6
7use actix_utils::oneshot;
8use actix_utils::task::LocalWaker;
9use amqp_codec::protocol::{
10    Attach, DeliveryNumber, Disposition, Error, Handle, LinkError, ReceiverSettleMode, Role,
11    SenderSettleMode, Source, TerminusDurability, TerminusExpiryPolicy, Transfer,
12};
13use bytes::Bytes;
14use bytestring::ByteString;
15use futures::Stream;
16
17use crate::cell::Cell;
18use crate::errors::AmqpTransportError;
19use crate::session::{Session, SessionInner};
20use crate::Configuration;
21
22#[derive(Clone, Debug)]
23pub struct ReceiverLink {
24    pub(crate) inner: Cell<ReceiverLinkInner>,
25}
26
27impl ReceiverLink {
28    pub(crate) fn new(inner: Cell<ReceiverLinkInner>) -> ReceiverLink {
29        ReceiverLink { inner }
30    }
31
32    pub fn handle(&self) -> Handle {
33        self.inner.get_ref().handle as Handle
34    }
35
36    pub fn credit(&self) -> u32 {
37        self.inner.get_ref().credit
38    }
39
40    pub fn session(&self) -> &Session {
41        &self.inner.get_ref().session
42    }
43
44    pub fn session_mut(&mut self) -> &mut Session {
45        &mut self.inner.get_mut().session
46    }
47
48    pub fn frame(&self) -> &Attach {
49        &self.inner.get_ref().attach
50    }
51
52    pub fn open(&mut self) {
53        let inner = self.inner.get_mut();
54        inner
55            .session
56            .inner
57            .get_mut()
58            .confirm_receiver_link(inner.handle, &inner.attach);
59    }
60
61    pub fn set_link_credit(&mut self, credit: u32) {
62        self.inner.get_mut().set_link_credit(credit);
63    }
64
65    /// Send disposition frame
66    pub fn send_disposition(&mut self, disp: Disposition) {
67        self.inner
68            .get_mut()
69            .session
70            .inner
71            .get_mut()
72            .post_frame(disp.into());
73    }
74
75    /// Wait for disposition with specified number
76    pub fn wait_disposition(
77        &mut self,
78        id: DeliveryNumber,
79    ) -> impl Future<Output = Result<Disposition, AmqpTransportError>> {
80        self.inner.get_mut().session.wait_disposition(id)
81    }
82
83    pub fn close(&self) -> impl Future<Output = Result<(), AmqpTransportError>> {
84        self.inner.get_mut().close(None)
85    }
86
87    pub fn close_with_error(
88        &self,
89        error: Error,
90    ) -> impl Future<Output = Result<(), AmqpTransportError>> {
91        self.inner.get_mut().close(Some(error))
92    }
93
94    #[inline]
95    /// Get remote connection configuration
96    pub fn remote_config(&self) -> &Configuration {
97        &self.inner.session.remote_config()
98    }
99}
100
101impl Stream for ReceiverLink {
102    type Item = Result<Transfer, AmqpTransportError>;
103
104    fn poll_next(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Option<Self::Item>> {
105        let inner = self.inner.get_mut();
106
107        if let Some(tr) = inner.queue.pop_front() {
108            Poll::Ready(Some(Ok(tr)))
109        } else {
110            if inner.closed {
111                Poll::Ready(None)
112            } else {
113                inner.reader_task.register(cx.waker());
114                Poll::Pending
115            }
116        }
117    }
118}
119
120#[derive(Debug)]
121pub(crate) struct ReceiverLinkInner {
122    handle: Handle,
123    attach: Attach,
124    session: Session,
125    closed: bool,
126    reader_task: LocalWaker,
127    queue: VecDeque<Transfer>,
128    credit: u32,
129    delivery_count: u32,
130}
131
132impl ReceiverLinkInner {
133    pub(crate) fn new(
134        session: Cell<SessionInner>,
135        handle: Handle,
136        attach: Attach,
137    ) -> ReceiverLinkInner {
138        ReceiverLinkInner {
139            handle,
140            session: Session::new(session),
141            closed: false,
142            reader_task: LocalWaker::new(),
143            queue: VecDeque::with_capacity(4),
144            credit: 0,
145            delivery_count: attach.initial_delivery_count().unwrap_or(0),
146            attach,
147        }
148    }
149
150    pub fn name(&self) -> &ByteString {
151        &self.attach.name
152    }
153
154    pub fn close(
155        &mut self,
156        error: Option<Error>,
157    ) -> impl Future<Output = Result<(), AmqpTransportError>> {
158        let (tx, rx) = oneshot::channel();
159        if self.closed {
160            let _ = tx.send(Ok(()));
161        } else {
162            self.session
163                .inner
164                .get_mut()
165                .detach_receiver_link(self.handle, true, error, tx);
166        }
167        async move {
168            match rx.await {
169                Ok(Ok(_)) => Ok(()),
170                Ok(Err(e)) => Err(e),
171                Err(_) => Err(AmqpTransportError::Disconnected),
172            }
173        }
174    }
175
176    pub fn set_link_credit(&mut self, credit: u32) {
177        self.credit += credit;
178        self.session
179            .inner
180            .get_mut()
181            .rcv_link_flow(self.handle as u32, self.delivery_count, credit);
182    }
183
184    pub fn handle_transfer(&mut self, transfer: Transfer) {
185        if self.credit == 0 {
186            // check link credit
187            let err = Error {
188                condition: LinkError::TransferLimitExceeded.into(),
189                description: None,
190                info: None,
191            };
192            let _ = self.close(Some(err));
193        } else {
194            self.credit -= 1;
195            self.delivery_count += 1;
196            self.queue.push_back(transfer);
197            if self.queue.len() == 1 {
198                self.reader_task.wake()
199            }
200        }
201    }
202}
203
204pub struct ReceiverLinkBuilder {
205    frame: Attach,
206    session: Cell<SessionInner>,
207}
208
209impl ReceiverLinkBuilder {
210    pub(crate) fn new(name: ByteString, address: ByteString, session: Cell<SessionInner>) -> Self {
211        let source = Source {
212            address: Some(address),
213            durable: TerminusDurability::None,
214            expiry_policy: TerminusExpiryPolicy::SessionEnd,
215            timeout: 0,
216            dynamic: false,
217            dynamic_node_properties: None,
218            distribution_mode: None,
219            filter: None,
220            default_outcome: None,
221            outcomes: None,
222            capabilities: None,
223        };
224        let frame = Attach {
225            name,
226            handle: 0 as Handle,
227            role: Role::Receiver,
228            snd_settle_mode: SenderSettleMode::Mixed,
229            rcv_settle_mode: ReceiverSettleMode::First,
230            source: Some(source),
231            target: None,
232            unsettled: None,
233            incomplete_unsettled: false,
234            initial_delivery_count: None,
235            max_message_size: Some(65536 * 4),
236            offered_capabilities: None,
237            desired_capabilities: None,
238            properties: None,
239        };
240
241        ReceiverLinkBuilder { frame, session }
242    }
243
244    pub fn max_message_size(mut self, size: u64) -> Self {
245        self.frame.max_message_size = Some(size);
246        self
247    }
248
249    pub async fn open(self) -> Result<ReceiverLink, AmqpTransportError> {
250        let cell = self.session.clone();
251        let res = self
252            .session
253            .get_mut()
254            .open_local_receiver_link(cell, self.frame)
255            .await;
256
257        match res {
258            Ok(Ok(res)) => Ok(res),
259            Ok(Err(err)) => Err(err),
260            Err(_) => Err(AmqpTransportError::Disconnected),
261        }
262    }
263}