1use std::cell::Cell as StdCell;
2
3use ntex::{channel::pool, util::Bytes};
4use ntex_amqp_codec::protocol::{
5 DeliveryNumber, DeliveryState, Disposition, DispositionInner, Error, ErrorCondition, Handle,
6 MessageFormat, Rejected, Role, TransferBody,
7};
8use ntex_amqp_codec::types::{Str, Symbol};
9
10use crate::session::Session;
11use crate::{cell::Cell, error::AmqpProtocolError, sndlink::SenderLinkInner};
12
13bitflags::bitflags! {
14 #[derive(Copy, Clone, Debug)]
15 struct Flags: u8 {
16 const SENDER = 0b0000_0001;
17 const LOCAL_SETTLED = 0b0000_0100;
18 const REMOTE_SETTLED = 0b0000_1000;
19 }
20}
21
22#[derive(Debug)]
23pub struct Delivery {
24 id: DeliveryNumber,
25 tag: Bytes,
26 session: Session,
27 flags: StdCell<Flags>,
28}
29
30#[derive(Default, Debug)]
31pub(crate) struct DeliveryInner {
32 handle: Handle,
33 settled: bool,
34 state: Option<DeliveryState>,
35 error: Option<AmqpProtocolError>,
36 tx: Option<pool::Sender<()>>,
37}
38
39impl Delivery {
40 pub(crate) fn new_rcv(
41 id: DeliveryNumber,
42 link_handle: Handle,
43 tag: Bytes,
44 settled: bool,
45 session: Session,
46 ) -> Delivery {
47 if !settled {
48 session
49 .inner
50 .get_mut()
51 .unsettled_rcv_deliveries
52 .insert(id, DeliveryInner::new(link_handle));
53 }
54
55 Delivery {
56 id,
57 tag,
58 session,
59 flags: StdCell::new(if settled {
60 Flags::LOCAL_SETTLED
61 } else {
62 Flags::empty()
63 }),
64 }
65 }
66
67 pub fn id(&self) -> DeliveryNumber {
68 self.id
69 }
70
71 pub fn tag(&self) -> &Bytes {
72 &self.tag
73 }
74
75 pub fn remote_state(&self) -> Option<DeliveryState> {
76 if let Some(inner) = self
77 .session
78 .inner
79 .get_mut()
80 .unsettled_deliveries(self.is_set(Flags::SENDER))
81 .get_mut(&self.id)
82 {
83 inner.state.clone()
84 } else {
85 None
86 }
87 }
88
89 pub fn is_remote_settled(&self) -> bool {
90 self.is_set(Flags::REMOTE_SETTLED)
91 }
92
93 pub fn settle(&mut self, state: DeliveryState) {
94 if self.is_set(Flags::REMOTE_SETTLED) {
96 return;
97 }
98
99 if !self.is_set(Flags::LOCAL_SETTLED) {
100 self.set_flag(Flags::LOCAL_SETTLED);
101
102 let disp = Disposition(Box::new(DispositionInner {
103 role: if self.is_set(Flags::SENDER) {
104 Role::Sender
105 } else {
106 Role::Receiver
107 },
108 first: self.id,
109 last: None,
110 settled: true,
111 state: Some(state),
112 batchable: false,
113 }));
114 self.session.inner.get_mut().post_frame(disp.into());
115 }
116 }
117
118 pub fn update_state(&mut self, state: DeliveryState) {
119 if self.is_set(Flags::REMOTE_SETTLED) || self.is_set(Flags::LOCAL_SETTLED) {
121 return;
122 }
123
124 let disp = Disposition(Box::new(DispositionInner {
125 role: if self.is_set(Flags::SENDER) {
126 Role::Sender
127 } else {
128 Role::Receiver
129 },
130 first: self.id,
131 last: None,
132 settled: false,
133 state: Some(state),
134 batchable: false,
135 }));
136 self.session.inner.get_mut().post_frame(disp.into());
137 }
138
139 fn is_set(&self, flag: Flags) -> bool {
140 self.flags.get().contains(flag)
141 }
142
143 fn set_flag(&self, flag: Flags) {
144 let mut flags = self.flags.get();
145 flags.insert(flag);
146 self.flags.set(flags);
147 }
148
149 pub async fn wait(&self) -> Result<Option<DeliveryState>, AmqpProtocolError> {
150 if self.flags.get().contains(Flags::LOCAL_SETTLED) {
151 log::debug!("Delivery {:?} is settled locally", self.id);
152 return Ok(None);
153 }
154
155 let rx = if let Some(inner) = self
156 .session
157 .inner
158 .get_mut()
159 .unsettled_deliveries(self.is_set(Flags::SENDER))
160 .get_mut(&self.id)
161 {
162 if let Some(st) = self.check_inner(inner) {
163 return st;
164 }
165
166 let (tx, rx) = self.session.inner.get_ref().pool_notify.channel();
167 inner.tx = Some(tx);
168 rx
169 } else {
170 return Err(AmqpProtocolError::LinkDetached(None));
172 };
173 if rx.await.is_err() {
174 return Err(AmqpProtocolError::ConnectionDropped);
175 }
176
177 if let Some(inner) = self
178 .session
179 .inner
180 .get_mut()
181 .unsettled_deliveries(self.is_set(Flags::SENDER))
182 .get_mut(&self.id)
183 {
184 if inner.settled {
185 self.set_flag(Flags::REMOTE_SETTLED);
186 }
187 if let Some(st) = self.check_inner(inner) {
188 return st;
189 }
190 } else {
191 return Err(AmqpProtocolError::LinkDetached(None));
193 }
194 Ok(None)
195 }
196
197 fn check_inner(
198 &self,
199 inner: &mut DeliveryInner,
200 ) -> Option<Result<Option<DeliveryState>, AmqpProtocolError>> {
201 if let Some(ref st) = inner.state {
202 if matches!(st, DeliveryState::Modified(..)) {
203 Some(Ok(Some(inner.state.take().unwrap())))
205 } else {
206 Some(Ok(Some(st.clone())))
208 }
209 } else {
210 inner.error.as_ref().map(|err| Err(err.clone()))
211 }
212 }
213}
214
215impl Drop for Delivery {
216 fn drop(&mut self) {
217 let inner = self.session.inner.get_mut();
218 let deliveries = inner.unsettled_deliveries(self.is_set(Flags::SENDER));
219
220 if deliveries.contains_key(&self.id) {
221 deliveries.remove(&self.id);
222
223 if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) {
224 let err = Error::build()
225 .condition(ErrorCondition::Custom(Symbol(Str::from_static(
226 "Internal error",
227 ))))
228 .finish();
229
230 let disp = Disposition(Box::new(DispositionInner {
231 role: if self.is_set(Flags::SENDER) {
232 Role::Sender
233 } else {
234 Role::Receiver
235 },
236 first: self.id,
237 last: None,
238 settled: true,
239 state: Some(DeliveryState::Rejected(Rejected { error: Some(err) })),
240 batchable: false,
241 }));
242 inner.post_frame(disp.into());
243 }
244 }
245 }
246}
247
248impl DeliveryInner {
249 pub(crate) fn new(handle: Handle) -> Self {
250 Self {
251 handle,
252 tx: None,
253 state: None,
254 error: None,
255 settled: false,
256 }
257 }
258
259 pub(crate) fn handle(&self) -> Handle {
260 self.handle
261 }
262
263 pub(crate) fn set_error(&mut self, error: AmqpProtocolError) {
264 self.error = Some(error);
265 if let Some(tx) = self.tx.take() {
266 let _ = tx.send(());
267 }
268 }
269
270 pub(crate) fn handle_disposition(&mut self, disp: Disposition) {
271 if disp.settled() {
272 self.settled = true;
273 }
274 if let Some(state) = disp.state() {
275 self.state = Some(state.clone());
276 }
277 if let Some(tx) = self.tx.take() {
278 let _ = tx.send(());
279 }
280 }
281}
282
283impl Drop for DeliveryInner {
284 fn drop(&mut self) {
285 if let Some(tx) = self.tx.take() {
286 let _ = tx.send(());
287 }
288 }
289}
290
291pub struct TransferBuilder {
292 tag: Option<Bytes>,
293 settled: bool,
294 data: TransferBody,
295 format: Option<MessageFormat>,
296 sender: Cell<SenderLinkInner>,
297}
298
299impl TransferBuilder {
300 pub(crate) fn new(data: TransferBody, sender: Cell<SenderLinkInner>) -> Self {
301 Self {
302 tag: None,
303 settled: false,
304 format: None,
305 data,
306 sender,
307 }
308 }
309
310 pub fn tag(mut self, tag: Bytes) -> Self {
311 self.tag = Some(tag);
312 self
313 }
314
315 pub fn settled(mut self) -> Self {
316 self.settled = true;
317 self
318 }
319
320 pub fn format(mut self, fmt: MessageFormat) -> Self {
321 self.format = Some(fmt);
322 self
323 }
324
325 pub async fn send(self) -> Result<Delivery, AmqpProtocolError> {
326 let inner = self.sender.get_ref();
327
328 if let Some(ref err) = inner.error {
329 Err(err.clone())
330 } else if inner.closed {
331 Err(AmqpProtocolError::Disconnected)
332 } else {
333 if let Some(limit) = inner.max_message_size {
334 if self.data.len() > limit as usize {
335 return Err(AmqpProtocolError::BodyTooLarge);
336 }
337 }
338
339 let (id, tag) = self
340 .sender
341 .get_mut()
342 .send(self.data, self.tag, self.settled, self.format)
343 .await?;
344
345 Ok(Delivery {
346 id,
347 tag,
348 session: self.sender.get_ref().session.clone(),
349 flags: StdCell::new(if self.settled {
350 Flags::SENDER | Flags::LOCAL_SETTLED
351 } else {
352 Flags::SENDER
353 }),
354 })
355 }
356 }
357}