dove/
driver.rs

1/*
2 * Copyright 2019-2020, Ulf Lilleengen
3 * License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
4 */
5
6//! The driver module is an intermediate layer with the core logic for interacting with different AMQP 1.0 endpoint entities (connections, sessions, links).
7
8use crate::conn::ChannelId;
9use crate::connection::ConnectionHandle;
10use crate::error::*;
11use crate::framing;
12use crate::framing::{
13    AmqpFrame, Attach, Begin, Close, DeliveryState, Detach, End, Flow, Frame, LinkRole, Open,
14    Performative, Source, Target, Transfer,
15};
16use crate::message::Message;
17use crate::options::LinkOptions;
18use rand::Rng;
19use std::collections::hash_map::Entry;
20use std::collections::HashMap;
21use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24use uuid::Uuid;
25
26pub type DeliveryTag = Vec<u8>;
27pub type HandleId = u32;
28
29#[derive(Debug)]
30pub struct ConnectionDriver {
31    channel_max: u16,
32    idle_timeout: Duration,
33    connection: ConnectionHandle,
34    sessions: Mutex<HashMap<ChannelId, Arc<SessionDriver>>>,
35
36    // Frames received on this connection
37    rx: Channel<AmqpFrame>,
38    remote_channel_map: Mutex<HashMap<ChannelId, ChannelId>>,
39    remote_idle_timeout: Duration,
40
41    // State
42    closed: AtomicBool,
43}
44
45#[derive(Debug)]
46pub struct SessionDriver {
47    // Frames received on this session
48    connection: ConnectionHandle,
49    local_channel: ChannelId,
50    rx: Channel<AmqpFrame>,
51
52    links_in_flight: Mutex<HashMap<String, Arc<LinkDriver>>>,
53    links: Mutex<HashMap<HandleId, Arc<LinkDriver>>>,
54    handle_generator: AtomicU32,
55
56    #[allow(clippy::type_complexity)]
57    did_to_delivery: Arc<Mutex<HashMap<u32, (HandleId, Arc<DeliveryDriver>)>>>,
58    initial_outgoing_id: u32,
59
60    flow_control: Arc<Mutex<SessionFlowControl>>,
61}
62
63// TODO: Make this use atomic operations
64#[derive(Clone, Debug)]
65struct SessionFlowControl {
66    next_outgoing_id: u32,
67    next_incoming_id: u32,
68
69    incoming_window: u32,
70    outgoing_window: u32,
71
72    remote_incoming_window: u32,
73    remote_outgoing_window: u32,
74}
75
76impl SessionFlowControl {
77    fn new() -> SessionFlowControl {
78        SessionFlowControl {
79            next_outgoing_id: 0,
80            next_incoming_id: 0,
81
82            incoming_window: std::i32::MAX as u32,
83            outgoing_window: std::i32::MAX as u32,
84
85            remote_incoming_window: 0,
86            remote_outgoing_window: 0,
87        }
88    }
89
90    fn accept(&mut self, delivery_id: u32) -> Result<bool> {
91        if delivery_id + 1 < self.next_incoming_id || self.remote_outgoing_window == 0 {
92            Err(AmqpError::framing_error(None))
93        } else if self.incoming_window == 0 {
94            Ok(false)
95        } else {
96            self.incoming_window -= 1;
97            self.next_incoming_id = delivery_id + 1;
98            self.remote_outgoing_window -= 1;
99            Ok(true)
100        }
101    }
102
103    fn next(&mut self) -> Option<SessionFlowControl> {
104        if self.outgoing_window > 0 && self.remote_incoming_window > 0 {
105            let original = self.clone();
106            self.next_outgoing_id += 1;
107            self.outgoing_window -= 1;
108            self.remote_incoming_window -= 1;
109            Some(original)
110        } else {
111            None
112        }
113    }
114}
115
116#[derive(Debug)]
117pub struct LinkDriver {
118    pub name: String,
119    pub handle: u32,
120    pub role: LinkRole,
121    pub channel: ChannelId,
122    connection: ConnectionHandle,
123    rx: Channel<AmqpFrame>,
124
125    session_flow_control: Arc<Mutex<SessionFlowControl>>,
126
127    #[allow(clippy::type_complexity)]
128    did_to_delivery: Arc<Mutex<HashMap<u32, (HandleId, Arc<DeliveryDriver>)>>>,
129    credit: AtomicU32,
130    delivery_count: AtomicU32,
131}
132
133#[derive(Debug)]
134pub struct DeliveryDriver {
135    pub message: Option<Message>,
136    pub remotely_settled: bool,
137    pub settled: bool,
138    pub state: Option<DeliveryState>,
139    pub tag: DeliveryTag,
140    pub id: u32,
141}
142
143pub struct SessionOpts {
144    pub max_frame_size: u32,
145}
146
147impl ConnectionDriver {
148    pub fn new(connection: ConnectionHandle, idle_timeout: Duration) -> ConnectionDriver {
149        ConnectionDriver {
150            connection,
151            rx: Channel::new(),
152            sessions: Mutex::new(HashMap::new()),
153            remote_channel_map: Mutex::new(HashMap::new()),
154            idle_timeout,
155            remote_idle_timeout: Duration::from_secs(0),
156            channel_max: u16::MAX,
157            closed: AtomicBool::new(false),
158        }
159    }
160
161    pub fn closed(&self) -> bool {
162        self.closed.load(Ordering::SeqCst)
163    }
164
165    pub fn connection(&self) -> &ConnectionHandle {
166        &self.connection
167    }
168
169    pub fn flowcontrol(&self) -> Result<()> {
170        let low_flow_watermark = 100;
171        let high_flow_watermark = 1000;
172
173        for (_, session) in self.sessions.lock().unwrap().iter_mut() {
174            for (_, link) in session.links.lock().unwrap().iter_mut() {
175                if link.role == LinkRole::Receiver {
176                    let credit = link.credit.load(Ordering::SeqCst);
177                    if credit <= low_flow_watermark {
178                        link.flow(high_flow_watermark)?;
179                    }
180                }
181            }
182        }
183        Ok(())
184    }
185
186    pub fn keepalive(&self) -> Result<()> {
187        let now = Instant::now();
188        let last_received = self.connection.keepalive(self.remote_idle_timeout, now)?;
189
190        if self.idle_timeout.as_millis() > 0 {
191            // Ensure our peer honors our keepalive
192            if now - last_received > self.idle_timeout * 2 {
193                self.connection.close(Close {
194                    error: Some(ErrorCondition::local_idle_timeout()),
195                })?;
196                warn!("Connection timed out");
197                return Err(AmqpError::IoError(std::io::Error::from(
198                    std::io::ErrorKind::TimedOut,
199                )));
200            }
201        }
202        Ok(())
203    }
204
205    pub fn open(&self, open: Open) -> Result<()> {
206        self.connection.open(open)
207    }
208
209    pub fn close(&self, error: Option<ErrorCondition>) -> Result<()> {
210        if self.closed.fetch_or(true, Ordering::SeqCst) {
211            return Ok(());
212        }
213
214        for (_id, session) in core::mem::take(&mut *self.sessions.lock().unwrap()) {
215            for (_id, link) in core::mem::take(&mut *session.links.lock().unwrap()) {
216                let _ = link.close(None);
217                link.rx.close();
218            }
219            let _ = session.close(None);
220            session.rx.close();
221        }
222
223        self.rx.close();
224        self.connection.close(Close { error })
225    }
226
227    pub(crate) fn dispatch(&self, frames: Vec<Frame>) -> Result<()> {
228        for frame in frames {
229            if let Frame::AMQP(frame) = frame {
230                trace!("Got AMQP frame: {:?}", frame.performative);
231                if let Some(ref performative) = frame.performative {
232                    let channel = frame.channel;
233                    match performative {
234                        Performative::Open(ref _open) => {
235                            self.rx.send(frame)?;
236                        }
237                        Performative::Close(ref _close) => {
238                            self.rx.send(frame)?;
239                        }
240                        Performative::Begin(ref begin) => {
241                            let m = self.sessions.lock().unwrap();
242                            let s = m.get(&channel);
243                            if let Some(s) = s {
244                                {
245                                    let mut f = s.flow_control.lock().unwrap();
246                                    f.remote_outgoing_window = begin.outgoing_window;
247                                    f.remote_incoming_window = begin.incoming_window;
248                                    if let Some(remote_channel) = begin.remote_channel {
249                                        let mut cm = self.remote_channel_map.lock().unwrap();
250                                        cm.insert(channel, remote_channel);
251                                    }
252                                }
253                                s.rx.send(frame)?;
254                            }
255                        }
256                        Performative::End(ref _end) => {
257                            let local_channel: Option<ChannelId> = {
258                                let cm = self.remote_channel_map.lock().unwrap();
259                                cm.get(&channel).cloned()
260                            };
261
262                            if let Some(local_channel) = local_channel {
263                                let mut m = self.sessions.lock().unwrap();
264                                m.get_mut(&local_channel).map(|s| s.rx.send(frame));
265                            }
266                        }
267                        _ => {
268                            let local_channel: Option<ChannelId> = {
269                                let cm = self.remote_channel_map.lock().unwrap();
270                                cm.get(&channel).cloned()
271                            };
272
273                            if let Some(local_channel) = local_channel {
274                                let session = {
275                                    let mut m = self.sessions.lock().unwrap();
276                                    m.get_mut(&local_channel).cloned()
277                                };
278
279                                if let Some(s) = session {
280                                    s.dispatch(frame)?;
281                                }
282                            }
283                        }
284                    }
285                }
286            }
287        }
288        Ok(())
289    }
290
291    fn allocate_session(&self) -> Option<Arc<SessionDriver>> {
292        let mut m = self.sessions.lock().unwrap();
293        for i in 0..self.channel_max {
294            let chan = i as ChannelId;
295            if let Entry::Vacant(entry) = m.entry(chan) {
296                let session = Arc::new(SessionDriver {
297                    connection: self.connection.clone(),
298                    local_channel: chan,
299                    rx: Channel::new(),
300
301                    links_in_flight: Mutex::new(HashMap::new()),
302                    links: Mutex::new(HashMap::new()),
303                    handle_generator: AtomicU32::new(0),
304
305                    flow_control: Arc::new(Mutex::new(SessionFlowControl::new())),
306                    initial_outgoing_id: 0,
307
308                    did_to_delivery: Arc::new(Mutex::new(HashMap::new())),
309                });
310                entry.insert(session.clone());
311                return Some(session);
312            }
313        }
314        None
315    }
316
317    pub async fn new_session(&self, _opts: Option<SessionOpts>) -> Result<Arc<SessionDriver>> {
318        let session = self
319            .allocate_session()
320            .ok_or(AmqpError::SessionAllocationExhausted)?;
321        let flow_control: SessionFlowControl = { session.flow_control.lock().unwrap().clone() };
322        let begin = Begin {
323            remote_channel: None,
324            next_outgoing_id: flow_control.next_outgoing_id,
325            incoming_window: flow_control.incoming_window,
326            outgoing_window: flow_control.outgoing_window,
327            handle_max: None,
328            offered_capabilities: None,
329            desired_capabilities: None,
330            properties: None,
331        };
332        debug!(
333            "Creating session with local channel {}",
334            session.local_channel
335        );
336
337        self.connection.begin(session.local_channel, begin)?;
338        Ok(session)
339    }
340
341    #[inline]
342    pub async fn recv(&self) -> Result<AmqpFrame> {
343        self.rx.recv().await
344    }
345
346    #[inline]
347    pub fn unrecv(&self, frame: AmqpFrame) -> Result<()> {
348        warn!("unrecv: {:?}", frame);
349        self.rx.send(frame)
350    }
351}
352
353impl SessionDriver {
354    pub fn dispatch(&self, frame: AmqpFrame) -> Result<()> {
355        trace!("Dispatching frame: {:?}", frame);
356        match &frame.performative {
357            Some(Performative::Attach(attach_response)) => {
358                let link = self
359                    .links_in_flight
360                    .lock()
361                    .unwrap()
362                    .remove(&attach_response.name);
363
364                if let Some(link) = link {
365                    let handle = attach_response.handle;
366                    if link.rx.send(frame).is_ok() {
367                        self.links.lock().unwrap().insert(handle, Arc::clone(&link));
368                    } else {
369                        error!("Failed to notify LinkDriver about attach frame")
370                    }
371                } else {
372                    error!(
373                        "Received attach frame for unknown link: {:?}",
374                        attach_response
375                    );
376                }
377            }
378            Some(Performative::Detach(ref detach)) => {
379                if let Some(link) = self.links.lock().unwrap().remove(&detach.handle) {
380                    link.rx.send(frame)?;
381                } else {
382                    warn!("Detach request with unknown handle received: {:?}", detach)
383                }
384            }
385            Some(Performative::Transfer(ref transfer)) => {
386                // Session flow control
387                if let Some(delivery_id) = transfer.delivery_id {
388                    loop {
389                        let result = self.flow_control.lock().unwrap().accept(delivery_id);
390                        match result {
391                            Err(AmqpError::Amqp(cond)) => {
392                                error!("Transfer error: {:?}", cond);
393                                self.close(Some(cond))?;
394                            }
395                            Err(e) => {
396                                error!("Transfer error: {:?}", e);
397                                self.close(None)?;
398                            }
399                            Ok(false) => {}
400                            Ok(true) => break,
401                        }
402                    }
403                }
404
405                let link = {
406                    let mut m = self.links.lock().unwrap();
407                    m.get_mut(&transfer.handle).unwrap().clone()
408                };
409
410                let count_down = |x| {
411                    if x == 0 {
412                        Some(0)
413                    } else {
414                        Some(x - 1)
415                    }
416                };
417                // Link flow control
418                if link
419                    .credit
420                    .fetch_update(Ordering::SeqCst, Ordering::SeqCst, count_down)
421                    == Ok(0)
422                {
423                    trace!("Transfer but no space left!");
424                } else {
425                    trace!(
426                        "Received transfer. Credit: {:?}",
427                        link.credit.load(Ordering::SeqCst)
428                    );
429                    link.delivery_count.fetch_add(1, Ordering::SeqCst);
430                    link.rx.send(frame)?;
431                }
432            }
433            Some(Performative::Disposition(ref disposition)) => {
434                trace!("Received disposition: {:?}", disposition);
435                let last = disposition.last.unwrap_or(disposition.first);
436                for id in disposition.first..=last {
437                    if let Some((handle, _delivery)) =
438                        self.did_to_delivery.lock().unwrap().remove(&id)
439                    {
440                        if let Some(link) = self.links.lock().unwrap().get(&handle).cloned() {
441                            if link.role == disposition.role {
442                                link.rx.send(frame.clone())?;
443                            }
444                        } else {
445                            debug!("Disposition for invalid handle({}) received", handle);
446                        }
447                    }
448                }
449            }
450            Some(Performative::Flow(ref flow)) => {
451                trace!("Received flow!");
452                // Session flow control
453                {
454                    let mut control = self.flow_control.lock().unwrap();
455                    control.next_incoming_id = flow.next_outgoing_id;
456                    control.remote_outgoing_window = flow.outgoing_window;
457                    if let Some(next_incoming_id) = flow.next_incoming_id {
458                        control.remote_incoming_window =
459                            next_incoming_id + flow.incoming_window - control.next_outgoing_id;
460                    } else {
461                        control.remote_incoming_window = self.initial_outgoing_id
462                            + flow.incoming_window
463                            - control.next_outgoing_id;
464                    }
465                }
466                if let Some(handle) = flow.handle {
467                    let link = {
468                        let mut m = self.links.lock().unwrap();
469                        m.get_mut(&handle).ok_or(AmqpError::InvalidHandle)?.clone()
470                    };
471                    if let Some(credit) = flow.link_credit {
472                        let credit = flow.delivery_count.unwrap_or(0) + credit
473                            - link.delivery_count.load(Ordering::SeqCst);
474                        link.credit.store(credit, Ordering::SeqCst);
475                    }
476                }
477            }
478            _ => {
479                warn!("Unexpected performative for session: {:?}", frame);
480            }
481        }
482        Ok(())
483    }
484
485    pub fn close(&self, error: Option<ErrorCondition>) -> Result<()> {
486        self.connection.end(self.local_channel, End { error })
487    }
488
489    pub async fn new_link(
490        &self,
491        address: &str,
492        options: impl Into<LinkOptions>,
493    ) -> Result<(String, Arc<LinkDriver>)> {
494        let options = options.into();
495        let role = options.role();
496        let link_name = format!("dove-{}-{}", Uuid::new_v4(), role.as_str());
497        debug!("Creating link {} with role {:?}", link_name, role);
498
499        // Send attach frame
500        let attach = Attach {
501            name: link_name.clone(),
502            handle: self.next_handle_id(),
503            role,
504            snd_settle_mode: None,
505            rcv_settle_mode: None,
506            source: Some(Source {
507                address: Some(address.to_string()),
508                durable: None,
509                expiry_policy: None,
510                timeout: None,
511                dynamic: Some(false),
512                dynamic_node_properties: None,
513                default_outcome: None,
514                distribution_mode: None,
515                filter: None,
516                outcomes: None,
517                capabilities: None,
518            }),
519            target: Some(Target {
520                address: Some(address.to_string()),
521                durable: None,
522                expiry_policy: None,
523                timeout: None,
524                dynamic: Some(false),
525                dynamic_node_properties: None,
526                capabilities: None,
527            }),
528            unsettled: None,
529            incomplete_unsettled: None,
530            initial_delivery_count: if role == LinkRole::Sender {
531                Some(0)
532            } else {
533                None
534            },
535            max_message_size: None,
536            offered_capabilities: None,
537            desired_capabilities: None,
538            properties: None,
539        };
540
541        let attach = options.applied_on_attach(attach);
542        let link = Arc::new(LinkDriver {
543            name: link_name.clone(),
544            role,
545            channel: self.local_channel,
546            connection: self.connection.clone(),
547            handle: attach.handle,
548            rx: Channel::new(),
549            session_flow_control: self.flow_control.clone(),
550            did_to_delivery: self.did_to_delivery.clone(),
551            credit: AtomicU32::new(0),
552            delivery_count: AtomicU32::new(0),
553        });
554
555        self.links_in_flight
556            .lock()
557            .unwrap()
558            .insert(link_name.clone(), Arc::clone(&link));
559
560        debug!("Requesting attachment of {}/{}", attach.name, attach.handle);
561        self.connection.attach(self.local_channel, attach)?;
562
563        let frame = link.rx.recv().await?;
564        if let Some(Performative::Attach(response)) = frame.performative {
565            debug!(
566                "Received response for attach request: handle={}",
567                response.handle
568            );
569
570            // if it is not dynamic, we need to check whether the attach was successful
571            let requested_address = address;
572            let dynamic = matches!(options.dynamic(), Some(true));
573
574            let address_response = match response.role {
575                LinkRole::Sender => response.target.and_then(|t| t.address),
576                LinkRole::Receiver => response.source.and_then(|s| s.address),
577            };
578
579            match address_response {
580                Some(address) if dynamic || address == requested_address => Ok((address, link)),
581                invalid => {
582                    warn!(
583                        "Expected address {:?}, but server sent {:?}",
584                        requested_address, invalid
585                    );
586                    link.close(Some(ErrorCondition {
587                        condition: "amqp:invalid-field".to_string(),
588                        description: format!(
589                            "Expected address {:?}, but server sent {:?}",
590                            requested_address, invalid
591                        ),
592                    }))?;
593                    Err(AmqpError::TargetNotRecognized(
594                        requested_address.to_string(),
595                    ))
596                }
597            }
598        } else {
599            let condition = ErrorCondition {
600                condition: "amqp:precondition-failed".to_string(),
601                description: format!("Expected attach frame, but got {:?}", frame),
602            };
603            link.close(Some(condition.clone()))?;
604            Err(AmqpError::Amqp(condition))
605        }
606    }
607
608    fn next_handle_id(&self) -> HandleId {
609        loop {
610            let handle_id = self.handle_generator.fetch_add(1, Ordering::SeqCst);
611
612            loop {
613                // try_lock to prevent deadlocks
614                let links = match self.links.try_lock() {
615                    Ok(links) => links,
616                    Err(_) => continue,
617                };
618
619                // try_lock to prevent deadlocks
620                let links_in_flight = match self.links_in_flight.try_lock() {
621                    Ok(links_in_flight) => links_in_flight,
622                    Err(_) => continue,
623                };
624
625                if !links.values().any(|l| l.handle == handle_id)
626                    && !links_in_flight.values().any(|l| l.handle == handle_id)
627                {
628                    return handle_id;
629                } else {
630                    break;
631                }
632            }
633        }
634    }
635
636    #[inline]
637    pub async fn recv(&self) -> Result<AmqpFrame> {
638        self.rx.recv().await
639    }
640
641    #[inline]
642    pub fn unrecv(&self, frame: AmqpFrame) -> Result<()> {
643        warn!("unrecv");
644        self.rx.send(frame)
645    }
646}
647
648impl LinkDriver {
649    pub fn connection(&self) -> &ConnectionHandle {
650        &self.connection
651    }
652
653    pub fn credits(&self) -> u32 {
654        self.credit.load(Ordering::SeqCst)
655    }
656
657    pub async fn send_message(
658        &self,
659        message: Message,
660        settled: bool,
661    ) -> Result<Arc<DeliveryDriver>> {
662        let semaphore_fn = |x| {
663            if x == 0 {
664                Some(0)
665            } else {
666                Some(x - 1)
667            }
668        };
669
670        // Link flow control
671        if self
672            .credit
673            .fetch_update(Ordering::SeqCst, Ordering::SeqCst, semaphore_fn)
674            == Ok(0)
675        {
676            return Err(AmqpError::NotEnoughCreditsToSend(Box::new(message)));
677        }
678
679        // Session flow control
680        let next_outgoing_id = loop {
681            let props = self.session_flow_control.lock().unwrap().next();
682            if let Some(props) = props {
683                break props.next_outgoing_id;
684            }
685            warn!("No next_outgoing_id, busy looping");
686            // std::thread::sleep(Duration::from_millis(500));
687        };
688
689        self.delivery_count.fetch_add(1, Ordering::SeqCst);
690        let delivery_tag = rand::thread_rng().gen::<[u8; 16]>().to_vec();
691        let delivery = Arc::new(DeliveryDriver {
692            message: Some(message),
693            id: next_outgoing_id,
694            tag: delivery_tag.clone(),
695            state: None,
696            remotely_settled: false,
697            settled,
698        });
699
700        if !settled {
701            self.did_to_delivery
702                .lock()
703                .unwrap()
704                .insert(next_outgoing_id, (self.handle, delivery.clone()));
705        }
706
707        let transfer = Transfer {
708            handle: self.handle,
709            delivery_id: Some(next_outgoing_id),
710            delivery_tag: Some(delivery_tag),
711            message_format: Some(0),
712            settled: Some(settled),
713            more: Some(false),
714            rcv_settle_mode: None,
715            state: None,
716            resume: None,
717            aborted: None,
718            batchable: None,
719        };
720
721        let mut msgbuf = Vec::new();
722        if let Some(message) = delivery.message.as_ref() {
723            message.encode(&mut msgbuf)?;
724        }
725
726        self.connection
727            .transfer(self.channel, transfer, Some(msgbuf))?;
728
729        Ok(delivery)
730    }
731
732    pub fn flow(&self, credit: u32) -> Result<()> {
733        trace!("{}: issuing {} credits", self.handle, credit);
734        self.credit.store(credit, Ordering::SeqCst);
735        let props = { self.session_flow_control.lock().unwrap().clone() };
736        self.connection.flow(
737            self.channel,
738            Flow {
739                next_incoming_id: Some(props.next_incoming_id),
740                incoming_window: props.incoming_window,
741                next_outgoing_id: props.next_outgoing_id,
742                outgoing_window: props.outgoing_window,
743                handle: Some(self.handle),
744                delivery_count: Some(self.delivery_count.load(Ordering::SeqCst)),
745                link_credit: Some(credit),
746                available: None,
747                drain: None,
748                echo: None,
749                properties: None,
750            },
751        )
752    }
753
754    pub fn close(&self, error: Option<ErrorCondition>) -> Result<()> {
755        self.connection.detach(
756            self.channel,
757            Detach {
758                handle: self.handle,
759                closed: Some(true),
760                error,
761            },
762        )
763    }
764
765    #[inline]
766    pub async fn recv(&self) -> Result<AmqpFrame> {
767        self.rx.recv().await
768    }
769
770    #[inline]
771    pub fn unrecv(&self, frame: AmqpFrame) -> Result<()> {
772        warn!("unrecv");
773        self.rx.send(frame)
774    }
775
776    #[inline]
777    pub fn disposition(
778        &self,
779        delivery: &DeliveryDriver,
780        settled: bool,
781        state: DeliveryState,
782    ) -> Result<()> {
783        if settled {
784            self.session_flow_control.lock().unwrap().incoming_window += 1;
785        }
786        self.connection().disposition(
787            self.channel,
788            framing::Disposition {
789                role: self.role,
790                first: delivery.id,
791                last: Some(delivery.id),
792                settled: Some(settled),
793                state: Some(state),
794                batchable: None,
795            },
796        )
797    }
798}
799
800#[derive(Debug)]
801pub struct Channel<T> {
802    tx: async_channel::Sender<T>,
803    rx: async_channel::Receiver<T>,
804}
805
806impl<T> Default for Channel<T> {
807    fn default() -> Self {
808        let (tx, rx) = async_channel::unbounded();
809        Self { tx, rx }
810    }
811}
812
813impl<T> Channel<T> {
814    pub fn new() -> Channel<T> {
815        Self::default()
816    }
817
818    #[inline]
819    pub fn send(&self, value: T) -> Result<()> {
820        Ok(self.tx.try_send(value)?)
821    }
822
823    #[inline]
824    pub fn try_recv(&self) -> Result<T> {
825        Ok(self.rx.try_recv()?)
826    }
827
828    #[inline]
829    pub async fn recv(&self) -> Result<T> {
830        Ok(self.rx.recv().await?)
831    }
832
833    pub fn close(&self) {
834        self.tx.close();
835        self.rx.close();
836    }
837
838    pub fn handle<H: From<async_channel::Sender<T>>>(&self) -> H {
839        H::from(self.tx.clone())
840    }
841
842    pub fn handle_with<P, H: From<(async_channel::Sender<T>, P)>>(&self, param: P) -> H {
843        H::from((self.tx.clone(), param))
844    }
845}
846
847#[cfg(test)]
848mod tests {
849    #[test]
850    fn check_handle_map() {}
851}