ntex_amqp/
sndlink.rs

1use std::{collections::VecDeque, future::Future};
2
3use ntex::channel::{condition, oneshot, pool};
4use ntex::util::{BufMut, ByteString, Bytes, Either, PoolRef, Ready};
5use ntex_amqp_codec::protocol::{
6    self as codec, Attach, DeliveryNumber, Error, Flow, MessageFormat, ReceiverSettleMode, Role,
7    SenderSettleMode, SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
8};
9
10use crate::delivery::TransferBuilder;
11use crate::session::{Session, SessionInner};
12use crate::{cell::Cell, error::AmqpProtocolError, Handle};
13
14#[derive(Clone)]
15pub struct SenderLink {
16    pub(crate) inner: Cell<SenderLinkInner>,
17}
18
19pub(crate) struct SenderLinkInner {
20    pub(crate) id: usize,
21    name: ByteString,
22    pub(crate) session: Session,
23    remote_handle: Handle,
24    delivery_count: SequenceNo,
25    delivery_tag: u32,
26    link_credit: u32,
27    pending_transfers: VecDeque<pool::Sender<Result<(), AmqpProtocolError>>>,
28    pub(crate) error: Option<AmqpProtocolError>,
29    pub(crate) closed: bool,
30    pub(crate) max_message_size: Option<u32>,
31    on_close: condition::Condition,
32    on_credit: condition::Condition,
33    pool: PoolRef,
34}
35
36impl std::fmt::Debug for SenderLink {
37    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38        fmt.debug_tuple("SenderLink")
39            .field(&std::ops::Deref::deref(&self.inner.get_ref().name))
40            .finish()
41    }
42}
43
44impl std::fmt::Debug for SenderLinkInner {
45    fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46        fmt.debug_tuple("SenderLinkInner")
47            .field(&std::ops::Deref::deref(&self.name))
48            .finish()
49    }
50}
51
52impl SenderLink {
53    pub(crate) fn new(inner: Cell<SenderLinkInner>) -> SenderLink {
54        SenderLink { inner }
55    }
56
57    #[inline]
58    /// Id of the sender link
59    pub fn id(&self) -> u32 {
60        self.inner.id as u32
61    }
62
63    #[inline]
64    /// Name of the sender link
65    pub fn name(&self) -> &ByteString {
66        &self.inner.name
67    }
68
69    #[inline]
70    /// Remote handle
71    pub fn remote_handle(&self) -> Handle {
72        self.inner.remote_handle
73    }
74
75    #[inline]
76    /// Reference to session
77    pub fn session(&self) -> &Session {
78        &self.inner.get_ref().session
79    }
80
81    #[inline]
82    /// Returns available send credit
83    pub fn credit(&self) -> u32 {
84        self.inner.get_ref().link_credit
85    }
86
87    /// Get notification when packet could be send to the peer.
88    ///
89    /// Result indicates if connection is alive
90    pub async fn ready(&self) -> bool {
91        loop {
92            let waiter = {
93                let inner = self.inner.get_ref();
94                if inner.closed {
95                    return false;
96                }
97                if inner.link_credit > 0 {
98                    return true;
99                }
100                inner.on_credit.wait()
101            };
102            waiter.await
103        }
104    }
105
106    #[inline]
107    /// Check is link is closed
108    pub fn is_closed(&self) -> bool {
109        self.inner.closed
110    }
111
112    #[inline]
113    /// Check link state
114    pub fn is_opened(&self) -> bool {
115        !self.inner.closed
116    }
117
118    /// Check link error
119    pub fn error(&self) -> Option<&AmqpProtocolError> {
120        self.inner.get_ref().error.as_ref()
121    }
122
123    #[doc(hidden)]
124    #[deprecated]
125    /// Start delivery process
126    pub fn delivery<T>(&self, body: T) -> TransferBuilder
127    where
128        T: Into<TransferBody>,
129    {
130        self.transfer(body)
131    }
132
133    /// Start delivery process
134    pub fn transfer<T>(&self, body: T) -> TransferBuilder
135    where
136        T: Into<TransferBody>,
137    {
138        TransferBuilder::new(body.into(), self.inner.clone())
139    }
140
141    /// Close sender link
142    pub fn close(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
143        self.inner.get_mut().close(None)
144    }
145
146    /// Close sender link with error
147    pub fn close_with_error<E>(
148        &self,
149        error: E,
150    ) -> impl Future<Output = Result<(), AmqpProtocolError>>
151    where
152        Error: From<E>,
153    {
154        self.inner.get_mut().close(Some(error.into()))
155    }
156
157    pub fn on_close(&self) -> condition::Waiter {
158        self.inner.get_ref().on_close.wait()
159    }
160
161    /// Notify when credit get updated
162    ///
163    /// After notification credit must be checked again,
164    /// other waiters could consume it.
165    pub fn on_credit_update(&self) -> condition::Waiter {
166        self.inner.get_ref().on_credit.wait()
167    }
168
169    pub fn max_message_size(&self) -> Option<u32> {
170        self.inner.get_ref().max_message_size
171    }
172
173    pub fn set_max_message_size(&self, value: u32) {
174        self.inner.get_mut().max_message_size = Some(value)
175    }
176}
177
178impl SenderLinkInner {
179    pub(crate) fn new(
180        id: usize,
181        name: ByteString,
182        handle: Handle,
183        delivery_count: SequenceNo,
184        session: Cell<SessionInner>,
185        max_message_size: Option<u32>,
186    ) -> SenderLinkInner {
187        let pool = session.get_ref().memory_pool();
188        SenderLinkInner {
189            id,
190            name,
191            pool,
192            delivery_count,
193            max_message_size,
194            session: Session::new(session),
195            remote_handle: handle,
196            link_credit: 0,
197            pending_transfers: VecDeque::new(),
198            error: None,
199            closed: false,
200            delivery_tag: 0,
201            on_close: condition::Condition::new(),
202            on_credit: condition::Condition::new(),
203        }
204    }
205
206    pub(crate) fn with(id: usize, frame: &Attach, session: Cell<SessionInner>) -> SenderLinkInner {
207        let mut name = None;
208        if let Some(source) = frame.source() {
209            if let Some(ref addr) = source.address {
210                name = Some(addr.clone());
211            }
212        }
213        let mut name = name.unwrap_or_default();
214        name.trimdown();
215
216        let delivery_count = frame.initial_delivery_count().unwrap_or(0);
217        let max_message_size = frame
218            .max_message_size()
219            .map(|size| u32::try_from(size).unwrap_or(u32::MAX));
220
221        SenderLinkInner::new(
222            id,
223            name,
224            frame.handle(),
225            delivery_count,
226            session,
227            max_message_size,
228        )
229    }
230
231    pub(crate) fn id(&self) -> u32 {
232        self.id as u32
233    }
234
235    pub(crate) fn remote_handle(&self) -> Handle {
236        self.remote_handle
237    }
238
239    pub(crate) fn name(&self) -> &ByteString {
240        &self.name
241    }
242
243    pub(crate) fn max_message_size(&self) -> Option<u32> {
244        self.max_message_size
245    }
246
247    pub(crate) fn remote_detached(&mut self, err: AmqpProtocolError) {
248        log::trace!(
249            "{}: Detaching sender link {:?} with error {:?}",
250            self.session.tag(),
251            self.name,
252            err
253        );
254
255        // drop pending transfers
256        for tx in self.pending_transfers.drain(..) {
257            let _ = tx.send(Err(err.clone()));
258        }
259
260        self.closed = true;
261        self.error = Some(err);
262        self.on_close.notify_and_lock_readiness();
263        self.on_credit.notify_and_lock_readiness();
264    }
265
266    pub(crate) fn close(
267        &mut self,
268        error: Option<Error>,
269    ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
270        if self.closed {
271            Either::Left(Ready::Ok(()))
272        } else {
273            self.closed = true;
274            self.on_close.notify_and_lock_readiness();
275            self.on_credit.notify_and_lock_readiness();
276
277            let (tx, rx) = oneshot::channel();
278
279            self.session
280                .inner
281                .get_mut()
282                .detach_sender_link(self.id as Handle, true, error, tx);
283
284            Either::Right(async move {
285                match rx.await {
286                    Ok(Ok(_)) => Ok(()),
287                    Ok(Err(e)) => Err(e),
288                    Err(_) => Err(AmqpProtocolError::Disconnected),
289                }
290            })
291        }
292    }
293
294    pub(crate) fn apply_flow(&mut self, flow: &Flow) {
295        // #2.7.6
296        if let Some(credit) = flow.link_credit() {
297            let delta = flow
298                .delivery_count()
299                .unwrap_or(0)
300                .wrapping_add(credit)
301                .wrapping_sub(self.delivery_count);
302
303            log::trace!(
304                "{}: Apply sender link {:?} flow, credit: {:?}({:?}) flow count: {:?}, delivery count: {:?} pending: {:?} new credit {:?}",
305                self.session.tag(),
306                self.name,
307                credit,
308                delta,
309                flow.delivery_count().unwrap_or(0),
310                self.delivery_count,
311                self.pending_transfers.len(),
312                self.link_credit
313            );
314
315            self.link_credit += delta;
316
317            // credit became available => drain pending_transfers
318            while let Some(tx) = self.pending_transfers.pop_front() {
319                let _ = tx.send(Ok(()));
320            }
321
322            // notify available credit waiters
323            if self.link_credit > 0 {
324                self.on_credit.notify();
325            }
326        }
327    }
328
329    pub(crate) async fn send<T: Into<TransferBody>>(
330        &mut self,
331        body: T,
332        tag: Option<Bytes>,
333        settled: bool,
334        format: Option<MessageFormat>,
335    ) -> Result<(DeliveryNumber, Bytes), AmqpProtocolError> {
336        if let Some(ref err) = self.error {
337            Err(err.clone())
338        } else {
339            let body = body.into();
340            let tag = self.get_tag(tag);
341
342            loop {
343                if self.link_credit == 0 || !self.pending_transfers.is_empty() {
344                    log::trace!(
345                        "{}: Sender link credit is 0({:?}), push to pending queue hnd:{}({} -> {}), queue size: {}", self.session.tag(),
346                        self.link_credit,
347                        self.name,
348                        self.id,
349                        self.remote_handle,
350                        self.pending_transfers.len()
351                    );
352                    let (tx, rx) = self.session.inner.get_ref().pool_credit.channel();
353                    self.pending_transfers.push_back(tx);
354                    rx.await
355                        .map_err(|_| AmqpProtocolError::ConnectionDropped)
356                        .and_then(|v| v)?;
357                    continue;
358                }
359                break;
360            }
361
362            // reduce link credit
363            self.link_credit -= 1;
364            self.delivery_count = self.delivery_count.wrapping_add(1);
365            let id = self
366                .session
367                .inner
368                .get_mut()
369                .send_transfer(self.id as u32, tag.clone(), body, settled, format)
370                .await?;
371
372            Ok((id, tag))
373        }
374    }
375
376    fn get_tag(&mut self, tag: Option<Bytes>) -> Bytes {
377        tag.unwrap_or_else(|| {
378            let delivery_tag = self.delivery_tag;
379            self.delivery_tag = delivery_tag.wrapping_add(1);
380
381            let mut buf = self.pool.buf_with_capacity(16);
382            buf.put_u32(delivery_tag);
383            buf.freeze()
384        })
385    }
386}
387
388#[derive(Debug)]
389pub(crate) struct EstablishedSenderLink(SenderLink);
390
391impl EstablishedSenderLink {
392    pub(crate) fn new(inner: Cell<SenderLinkInner>) -> EstablishedSenderLink {
393        EstablishedSenderLink(SenderLink::new(inner))
394    }
395}
396
397impl std::ops::Deref for EstablishedSenderLink {
398    type Target = SenderLink;
399
400    fn deref(&self) -> &Self::Target {
401        &self.0
402    }
403}
404
405impl Drop for EstablishedSenderLink {
406    fn drop(&mut self) {
407        let inner = self.0.inner.get_mut();
408        if !inner.closed {
409            inner.closed = true;
410            inner.on_close.notify_and_lock_readiness();
411            inner.on_credit.notify_and_lock_readiness();
412        }
413    }
414}
415
416pub struct SenderLinkBuilder {
417    frame: Attach,
418    session: Cell<SessionInner>,
419}
420
421impl SenderLinkBuilder {
422    pub(crate) fn new(name: ByteString, address: ByteString, session: Cell<SessionInner>) -> Self {
423        let target = Target {
424            address: Some(address),
425            durable: TerminusDurability::None,
426            expiry_policy: TerminusExpiryPolicy::SessionEnd,
427            timeout: 0,
428            dynamic: false,
429            dynamic_node_properties: None,
430            capabilities: None,
431        };
432        let frame = Attach(Box::new(codec::AttachInner {
433            name,
434            handle: 0_u32,
435            role: Role::Sender,
436            snd_settle_mode: SenderSettleMode::Mixed,
437            rcv_settle_mode: ReceiverSettleMode::First,
438            source: None,
439            target: Some(target),
440            unsettled: None,
441            incomplete_unsettled: false,
442            initial_delivery_count: None,
443            max_message_size: Some(65536 * 4),
444            offered_capabilities: None,
445            desired_capabilities: None,
446            properties: None,
447        }));
448
449        SenderLinkBuilder { frame, session }
450    }
451
452    /// Set max message size
453    pub fn max_message_size(mut self, size: u64) -> Self {
454        self.frame.0.max_message_size = Some(size);
455        self
456    }
457
458    /// Modify attach frame
459    pub fn with_frame<F>(mut self, f: F) -> Self
460    where
461        F: FnOnce(&mut Attach),
462    {
463        f(&mut self.frame);
464        self
465    }
466
467    /// Initiate attach sender process
468    pub async fn attach(self) -> Result<SenderLink, AmqpProtocolError> {
469        let result = self
470            .session
471            .get_mut()
472            .attach_local_sender_link(self.frame)
473            .await;
474
475        match result {
476            Ok(Ok(inner)) => Ok(SenderLink { inner }),
477            Ok(Err(e)) => Err(e),
478            Err(_) => Err(AmqpProtocolError::Disconnected),
479        }
480    }
481}