1use std::cell::Cell as StdCell;
2
3use ntex::{channel::pool, util::Bytes};
4use ntex_amqp_codec::protocol::{
5    DeliveryNumber, DeliveryState, Disposition, DispositionInner, Error, ErrorCondition, Handle,
6    MessageFormat, Rejected, Role, TransferBody,
7};
8use ntex_amqp_codec::types::{Str, Symbol};
9
10use crate::session::Session;
11use crate::{cell::Cell, error::AmqpProtocolError, sndlink::SenderLinkInner};
12
13bitflags::bitflags! {
14    #[derive(Copy, Clone, Debug)]
15    struct Flags: u8 {
16        const SENDER         = 0b0000_0001;
17        const LOCAL_SETTLED  = 0b0000_0100;
18        const REMOTE_SETTLED = 0b0000_1000;
19    }
20}
21
22#[derive(Debug)]
23pub struct Delivery {
24    id: DeliveryNumber,
25    tag: Bytes,
26    session: Session,
27    flags: StdCell<Flags>,
28}
29
30#[derive(Default, Debug)]
31pub(crate) struct DeliveryInner {
32    handle: Handle,
33    settled: bool,
34    state: Option<DeliveryState>,
35    error: Option<AmqpProtocolError>,
36    tx: Option<pool::Sender<()>>,
37}
38
39impl Delivery {
40    pub(crate) fn new_rcv(
41        id: DeliveryNumber,
42        link_handle: Handle,
43        tag: Bytes,
44        settled: bool,
45        session: Session,
46    ) -> Delivery {
47        if !settled {
48            session
49                .inner
50                .get_mut()
51                .unsettled_rcv_deliveries
52                .insert(id, DeliveryInner::new(link_handle));
53        }
54
55        Delivery {
56            id,
57            tag,
58            session,
59            flags: StdCell::new(if settled {
60                Flags::LOCAL_SETTLED
61            } else {
62                Flags::empty()
63            }),
64        }
65    }
66
67    pub fn id(&self) -> DeliveryNumber {
68        self.id
69    }
70
71    pub fn tag(&self) -> &Bytes {
72        &self.tag
73    }
74
75    pub fn remote_state(&self) -> Option<DeliveryState> {
76        if let Some(inner) = self
77            .session
78            .inner
79            .get_mut()
80            .unsettled_deliveries(self.is_set(Flags::SENDER))
81            .get_mut(&self.id)
82        {
83            inner.state.clone()
84        } else {
85            None
86        }
87    }
88
89    pub fn is_remote_settled(&self) -> bool {
90        self.is_set(Flags::REMOTE_SETTLED)
91    }
92
93    pub fn settle(&mut self, state: DeliveryState) {
94        if self.is_set(Flags::REMOTE_SETTLED) {
96            return;
97        }
98
99        if !self.is_set(Flags::LOCAL_SETTLED) {
100            self.set_flag(Flags::LOCAL_SETTLED);
101
102            let disp = Disposition(Box::new(DispositionInner {
103                role: if self.is_set(Flags::SENDER) {
104                    Role::Sender
105                } else {
106                    Role::Receiver
107                },
108                first: self.id,
109                last: None,
110                settled: true,
111                state: Some(state),
112                batchable: false,
113            }));
114            self.session.inner.get_mut().post_frame(disp.into());
115        }
116    }
117
118    pub fn update_state(&mut self, state: DeliveryState) {
119        if self.is_set(Flags::REMOTE_SETTLED) || self.is_set(Flags::LOCAL_SETTLED) {
121            return;
122        }
123
124        let disp = Disposition(Box::new(DispositionInner {
125            role: if self.is_set(Flags::SENDER) {
126                Role::Sender
127            } else {
128                Role::Receiver
129            },
130            first: self.id,
131            last: None,
132            settled: false,
133            state: Some(state),
134            batchable: false,
135        }));
136        self.session.inner.get_mut().post_frame(disp.into());
137    }
138
139    fn is_set(&self, flag: Flags) -> bool {
140        self.flags.get().contains(flag)
141    }
142
143    fn set_flag(&self, flag: Flags) {
144        let mut flags = self.flags.get();
145        flags.insert(flag);
146        self.flags.set(flags);
147    }
148
149    pub async fn wait(&self) -> Result<Option<DeliveryState>, AmqpProtocolError> {
150        if self.flags.get().contains(Flags::LOCAL_SETTLED) {
151            log::debug!("Delivery {:?} is settled locally", self.id);
152            return Ok(None);
153        }
154
155        let rx = if let Some(inner) = self
156            .session
157            .inner
158            .get_mut()
159            .unsettled_deliveries(self.is_set(Flags::SENDER))
160            .get_mut(&self.id)
161        {
162            if let Some(st) = self.check_inner(inner) {
163                return st;
164            }
165
166            let (tx, rx) = self.session.inner.get_ref().pool_notify.channel();
167            inner.tx = Some(tx);
168            rx
169        } else {
170            return Err(AmqpProtocolError::LinkDetached(None));
172        };
173        if rx.await.is_err() {
174            return Err(AmqpProtocolError::ConnectionDropped);
175        }
176
177        if let Some(inner) = self
178            .session
179            .inner
180            .get_mut()
181            .unsettled_deliveries(self.is_set(Flags::SENDER))
182            .get_mut(&self.id)
183        {
184            if inner.settled {
185                self.set_flag(Flags::REMOTE_SETTLED);
186            }
187            if let Some(st) = self.check_inner(inner) {
188                return st;
189            }
190        } else {
191            return Err(AmqpProtocolError::LinkDetached(None));
193        }
194        Ok(None)
195    }
196
197    fn check_inner(
198        &self,
199        inner: &mut DeliveryInner,
200    ) -> Option<Result<Option<DeliveryState>, AmqpProtocolError>> {
201        if let Some(ref st) = inner.state {
202            if matches!(st, DeliveryState::Modified(..)) {
203                Some(Ok(Some(inner.state.take().unwrap())))
205            } else {
206                Some(Ok(Some(st.clone())))
208            }
209        } else {
210            inner.error.as_ref().map(|err| Err(err.clone()))
211        }
212    }
213}
214
215impl Drop for Delivery {
216    fn drop(&mut self) {
217        let inner = self.session.inner.get_mut();
218        let deliveries = inner.unsettled_deliveries(self.is_set(Flags::SENDER));
219
220        if deliveries.contains_key(&self.id) {
221            deliveries.remove(&self.id);
222
223            if !self.is_set(Flags::REMOTE_SETTLED) && !self.is_set(Flags::LOCAL_SETTLED) {
224                let err = Error::build()
225                    .condition(ErrorCondition::Custom(Symbol(Str::from_static(
226                        "Internal error",
227                    ))))
228                    .finish();
229
230                let disp = Disposition(Box::new(DispositionInner {
231                    role: if self.is_set(Flags::SENDER) {
232                        Role::Sender
233                    } else {
234                        Role::Receiver
235                    },
236                    first: self.id,
237                    last: None,
238                    settled: true,
239                    state: Some(DeliveryState::Rejected(Rejected { error: Some(err) })),
240                    batchable: false,
241                }));
242                inner.post_frame(disp.into());
243            }
244        }
245    }
246}
247
248impl DeliveryInner {
249    pub(crate) fn new(handle: Handle) -> Self {
250        Self {
251            handle,
252            tx: None,
253            state: None,
254            error: None,
255            settled: false,
256        }
257    }
258
259    pub(crate) fn handle(&self) -> Handle {
260        self.handle
261    }
262
263    pub(crate) fn set_error(&mut self, error: AmqpProtocolError) {
264        self.error = Some(error);
265        if let Some(tx) = self.tx.take() {
266            let _ = tx.send(());
267        }
268    }
269
270    pub(crate) fn handle_disposition(&mut self, disp: Disposition) {
271        if disp.settled() {
272            self.settled = true;
273        }
274        if let Some(state) = disp.state() {
275            self.state = Some(state.clone());
276        }
277        if let Some(tx) = self.tx.take() {
278            let _ = tx.send(());
279        }
280    }
281}
282
283impl Drop for DeliveryInner {
284    fn drop(&mut self) {
285        if let Some(tx) = self.tx.take() {
286            let _ = tx.send(());
287        }
288    }
289}
290
291pub struct TransferBuilder {
292    tag: Option<Bytes>,
293    settled: bool,
294    data: TransferBody,
295    format: Option<MessageFormat>,
296    sender: Cell<SenderLinkInner>,
297}
298
299impl TransferBuilder {
300    pub(crate) fn new(data: TransferBody, sender: Cell<SenderLinkInner>) -> Self {
301        Self {
302            tag: None,
303            settled: false,
304            format: None,
305            data,
306            sender,
307        }
308    }
309
310    pub fn tag(mut self, tag: Bytes) -> Self {
311        self.tag = Some(tag);
312        self
313    }
314
315    pub fn settled(mut self) -> Self {
316        self.settled = true;
317        self
318    }
319
320    pub fn format(mut self, fmt: MessageFormat) -> Self {
321        self.format = Some(fmt);
322        self
323    }
324
325    pub async fn send(self) -> Result<Delivery, AmqpProtocolError> {
326        let inner = self.sender.get_ref();
327
328        if let Some(ref err) = inner.error {
329            Err(err.clone())
330        } else if inner.closed {
331            Err(AmqpProtocolError::Disconnected)
332        } else {
333            if let Some(limit) = inner.max_message_size {
334                if self.data.len() > limit as usize {
335                    return Err(AmqpProtocolError::BodyTooLarge);
336                }
337            }
338
339            let (id, tag) = self
340                .sender
341                .get_mut()
342                .send(self.data, self.tag, self.settled, self.format)
343                .await?;
344
345            Ok(Delivery {
346                id,
347                tag,
348                session: self.sender.get_ref().session.clone(),
349                flags: StdCell::new(if self.settled {
350                    Flags::SENDER | Flags::LOCAL_SETTLED
351                } else {
352                    Flags::SENDER
353                }),
354            })
355        }
356    }
357}