1use std::{collections::VecDeque, future::Future};
2
3use ntex::channel::{condition, oneshot, pool};
4use ntex::util::{BufMut, ByteString, Bytes, Either, PoolRef, Ready};
5use ntex_amqp_codec::protocol::{
6 self as codec, Attach, DeliveryNumber, Error, Flow, MessageFormat, ReceiverSettleMode, Role,
7 SenderSettleMode, SequenceNo, Target, TerminusDurability, TerminusExpiryPolicy, TransferBody,
8};
9
10use crate::delivery::TransferBuilder;
11use crate::session::{Session, SessionInner};
12use crate::{cell::Cell, error::AmqpProtocolError, Handle};
13
14#[derive(Clone)]
15pub struct SenderLink {
16 pub(crate) inner: Cell<SenderLinkInner>,
17}
18
19pub(crate) struct SenderLinkInner {
20 pub(crate) id: usize,
21 name: ByteString,
22 pub(crate) session: Session,
23 remote_handle: Handle,
24 delivery_count: SequenceNo,
25 delivery_tag: u32,
26 link_credit: u32,
27 pending_transfers: VecDeque<pool::Sender<Result<(), AmqpProtocolError>>>,
28 pub(crate) error: Option<AmqpProtocolError>,
29 pub(crate) closed: bool,
30 pub(crate) max_message_size: Option<u32>,
31 on_close: condition::Condition,
32 on_credit: condition::Condition,
33 pool: PoolRef,
34}
35
36impl std::fmt::Debug for SenderLink {
37 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
38 fmt.debug_tuple("SenderLink")
39 .field(&std::ops::Deref::deref(&self.inner.get_ref().name))
40 .finish()
41 }
42}
43
44impl std::fmt::Debug for SenderLinkInner {
45 fn fmt(&self, fmt: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
46 fmt.debug_tuple("SenderLinkInner")
47 .field(&std::ops::Deref::deref(&self.name))
48 .finish()
49 }
50}
51
52impl SenderLink {
53 pub(crate) fn new(inner: Cell<SenderLinkInner>) -> SenderLink {
54 SenderLink { inner }
55 }
56
57 #[inline]
58 pub fn id(&self) -> u32 {
60 self.inner.id as u32
61 }
62
63 #[inline]
64 pub fn name(&self) -> &ByteString {
66 &self.inner.name
67 }
68
69 #[inline]
70 pub fn remote_handle(&self) -> Handle {
72 self.inner.remote_handle
73 }
74
75 #[inline]
76 pub fn session(&self) -> &Session {
78 &self.inner.get_ref().session
79 }
80
81 #[inline]
82 pub fn credit(&self) -> u32 {
84 self.inner.get_ref().link_credit
85 }
86
87 pub async fn ready(&self) -> bool {
91 loop {
92 let waiter = {
93 let inner = self.inner.get_ref();
94 if inner.closed {
95 return false;
96 }
97 if inner.link_credit > 0 {
98 return true;
99 }
100 inner.on_credit.wait()
101 };
102 waiter.await
103 }
104 }
105
106 #[inline]
107 pub fn is_closed(&self) -> bool {
109 self.inner.closed
110 }
111
112 #[inline]
113 pub fn is_opened(&self) -> bool {
115 !self.inner.closed
116 }
117
118 pub fn error(&self) -> Option<&AmqpProtocolError> {
120 self.inner.get_ref().error.as_ref()
121 }
122
123 #[doc(hidden)]
124 #[deprecated]
125 pub fn delivery<T>(&self, body: T) -> TransferBuilder
127 where
128 T: Into<TransferBody>,
129 {
130 self.transfer(body)
131 }
132
133 pub fn transfer<T>(&self, body: T) -> TransferBuilder
135 where
136 T: Into<TransferBody>,
137 {
138 TransferBuilder::new(body.into(), self.inner.clone())
139 }
140
141 pub fn close(&self) -> impl Future<Output = Result<(), AmqpProtocolError>> {
143 self.inner.get_mut().close(None)
144 }
145
146 pub fn close_with_error<E>(
148 &self,
149 error: E,
150 ) -> impl Future<Output = Result<(), AmqpProtocolError>>
151 where
152 Error: From<E>,
153 {
154 self.inner.get_mut().close(Some(error.into()))
155 }
156
157 pub fn on_close(&self) -> condition::Waiter {
158 self.inner.get_ref().on_close.wait()
159 }
160
161 pub fn on_credit_update(&self) -> condition::Waiter {
166 self.inner.get_ref().on_credit.wait()
167 }
168
169 pub fn max_message_size(&self) -> Option<u32> {
170 self.inner.get_ref().max_message_size
171 }
172
173 pub fn set_max_message_size(&self, value: u32) {
174 self.inner.get_mut().max_message_size = Some(value)
175 }
176}
177
178impl SenderLinkInner {
179 pub(crate) fn new(
180 id: usize,
181 name: ByteString,
182 handle: Handle,
183 delivery_count: SequenceNo,
184 session: Cell<SessionInner>,
185 max_message_size: Option<u32>,
186 ) -> SenderLinkInner {
187 let pool = session.get_ref().memory_pool();
188 SenderLinkInner {
189 id,
190 name,
191 pool,
192 delivery_count,
193 max_message_size,
194 session: Session::new(session),
195 remote_handle: handle,
196 link_credit: 0,
197 pending_transfers: VecDeque::new(),
198 error: None,
199 closed: false,
200 delivery_tag: 0,
201 on_close: condition::Condition::new(),
202 on_credit: condition::Condition::new(),
203 }
204 }
205
206 pub(crate) fn with(id: usize, frame: &Attach, session: Cell<SessionInner>) -> SenderLinkInner {
207 let mut name = None;
208 if let Some(source) = frame.source() {
209 if let Some(ref addr) = source.address {
210 name = Some(addr.clone());
211 }
212 }
213 let mut name = name.unwrap_or_default();
214 name.trimdown();
215
216 let delivery_count = frame.initial_delivery_count().unwrap_or(0);
217 let max_message_size = frame
218 .max_message_size()
219 .map(|size| u32::try_from(size).unwrap_or(u32::MAX));
220
221 SenderLinkInner::new(
222 id,
223 name,
224 frame.handle(),
225 delivery_count,
226 session,
227 max_message_size,
228 )
229 }
230
231 pub(crate) fn id(&self) -> u32 {
232 self.id as u32
233 }
234
235 pub(crate) fn remote_handle(&self) -> Handle {
236 self.remote_handle
237 }
238
239 pub(crate) fn name(&self) -> &ByteString {
240 &self.name
241 }
242
243 pub(crate) fn max_message_size(&self) -> Option<u32> {
244 self.max_message_size
245 }
246
247 pub(crate) fn remote_detached(&mut self, err: AmqpProtocolError) {
248 log::trace!(
249 "{}: Detaching sender link {:?} with error {:?}",
250 self.session.tag(),
251 self.name,
252 err
253 );
254
255 for tx in self.pending_transfers.drain(..) {
257 let _ = tx.send(Err(err.clone()));
258 }
259
260 self.closed = true;
261 self.error = Some(err);
262 self.on_close.notify_and_lock_readiness();
263 self.on_credit.notify_and_lock_readiness();
264 }
265
266 pub(crate) fn close(
267 &mut self,
268 error: Option<Error>,
269 ) -> impl Future<Output = Result<(), AmqpProtocolError>> {
270 if self.closed {
271 Either::Left(Ready::Ok(()))
272 } else {
273 self.closed = true;
274 self.on_close.notify_and_lock_readiness();
275 self.on_credit.notify_and_lock_readiness();
276
277 let (tx, rx) = oneshot::channel();
278
279 self.session
280 .inner
281 .get_mut()
282 .detach_sender_link(self.id as Handle, true, error, tx);
283
284 Either::Right(async move {
285 match rx.await {
286 Ok(Ok(_)) => Ok(()),
287 Ok(Err(e)) => Err(e),
288 Err(_) => Err(AmqpProtocolError::Disconnected),
289 }
290 })
291 }
292 }
293
294 pub(crate) fn apply_flow(&mut self, flow: &Flow) {
295 if let Some(credit) = flow.link_credit() {
297 let delta = flow
298 .delivery_count()
299 .unwrap_or(0)
300 .wrapping_add(credit)
301 .wrapping_sub(self.delivery_count);
302
303 log::trace!(
304 "{}: Apply sender link {:?} flow, credit: {:?}({:?}) flow count: {:?}, delivery count: {:?} pending: {:?} new credit {:?}",
305 self.session.tag(),
306 self.name,
307 credit,
308 delta,
309 flow.delivery_count().unwrap_or(0),
310 self.delivery_count,
311 self.pending_transfers.len(),
312 self.link_credit
313 );
314
315 self.link_credit += delta;
316
317 while let Some(tx) = self.pending_transfers.pop_front() {
319 let _ = tx.send(Ok(()));
320 }
321
322 if self.link_credit > 0 {
324 self.on_credit.notify();
325 }
326 }
327 }
328
329 pub(crate) async fn send<T: Into<TransferBody>>(
330 &mut self,
331 body: T,
332 tag: Option<Bytes>,
333 settled: bool,
334 format: Option<MessageFormat>,
335 ) -> Result<(DeliveryNumber, Bytes), AmqpProtocolError> {
336 if let Some(ref err) = self.error {
337 Err(err.clone())
338 } else {
339 let body = body.into();
340 let tag = self.get_tag(tag);
341
342 loop {
343 if self.link_credit == 0 || !self.pending_transfers.is_empty() {
344 log::trace!(
345 "{}: Sender link credit is 0({:?}), push to pending queue hnd:{}({} -> {}), queue size: {}", self.session.tag(),
346 self.link_credit,
347 self.name,
348 self.id,
349 self.remote_handle,
350 self.pending_transfers.len()
351 );
352 let (tx, rx) = self.session.inner.get_ref().pool_credit.channel();
353 self.pending_transfers.push_back(tx);
354 rx.await
355 .map_err(|_| AmqpProtocolError::ConnectionDropped)
356 .and_then(|v| v)?;
357 continue;
358 }
359 break;
360 }
361
362 self.link_credit -= 1;
364 self.delivery_count = self.delivery_count.wrapping_add(1);
365 let id = self
366 .session
367 .inner
368 .get_mut()
369 .send_transfer(self.id as u32, tag.clone(), body, settled, format)
370 .await?;
371
372 Ok((id, tag))
373 }
374 }
375
376 fn get_tag(&mut self, tag: Option<Bytes>) -> Bytes {
377 tag.unwrap_or_else(|| {
378 let delivery_tag = self.delivery_tag;
379 self.delivery_tag = delivery_tag.wrapping_add(1);
380
381 let mut buf = self.pool.buf_with_capacity(16);
382 buf.put_u32(delivery_tag);
383 buf.freeze()
384 })
385 }
386}
387
388#[derive(Debug)]
389pub(crate) struct EstablishedSenderLink(SenderLink);
390
391impl EstablishedSenderLink {
392 pub(crate) fn new(inner: Cell<SenderLinkInner>) -> EstablishedSenderLink {
393 EstablishedSenderLink(SenderLink::new(inner))
394 }
395}
396
397impl std::ops::Deref for EstablishedSenderLink {
398 type Target = SenderLink;
399
400 fn deref(&self) -> &Self::Target {
401 &self.0
402 }
403}
404
405impl Drop for EstablishedSenderLink {
406 fn drop(&mut self) {
407 let inner = self.0.inner.get_mut();
408 if !inner.closed {
409 inner.closed = true;
410 inner.on_close.notify_and_lock_readiness();
411 inner.on_credit.notify_and_lock_readiness();
412 }
413 }
414}
415
416pub struct SenderLinkBuilder {
417 frame: Attach,
418 session: Cell<SessionInner>,
419}
420
421impl SenderLinkBuilder {
422 pub(crate) fn new(name: ByteString, address: ByteString, session: Cell<SessionInner>) -> Self {
423 let target = Target {
424 address: Some(address),
425 durable: TerminusDurability::None,
426 expiry_policy: TerminusExpiryPolicy::SessionEnd,
427 timeout: 0,
428 dynamic: false,
429 dynamic_node_properties: None,
430 capabilities: None,
431 };
432 let frame = Attach(Box::new(codec::AttachInner {
433 name,
434 handle: 0_u32,
435 role: Role::Sender,
436 snd_settle_mode: SenderSettleMode::Mixed,
437 rcv_settle_mode: ReceiverSettleMode::First,
438 source: None,
439 target: Some(target),
440 unsettled: None,
441 incomplete_unsettled: false,
442 initial_delivery_count: None,
443 max_message_size: Some(65536 * 4),
444 offered_capabilities: None,
445 desired_capabilities: None,
446 properties: None,
447 }));
448
449 SenderLinkBuilder { frame, session }
450 }
451
452 pub fn max_message_size(mut self, size: u64) -> Self {
454 self.frame.0.max_message_size = Some(size);
455 self
456 }
457
458 pub fn with_frame<F>(mut self, f: F) -> Self
460 where
461 F: FnOnce(&mut Attach),
462 {
463 f(&mut self.frame);
464 self
465 }
466
467 pub async fn attach(self) -> Result<SenderLink, AmqpProtocolError> {
469 let result = self
470 .session
471 .get_mut()
472 .attach_local_sender_link(self.frame)
473 .await;
474
475 match result {
476 Ok(Ok(inner)) => Ok(SenderLink { inner }),
477 Ok(Err(e)) => Err(e),
478 Err(_) => Err(AmqpProtocolError::Disconnected),
479 }
480 }
481}