1use crate::conn::ChannelId;
9use crate::connection::ConnectionHandle;
10use crate::error::*;
11use crate::framing;
12use crate::framing::{
13 AmqpFrame, Attach, Begin, Close, DeliveryState, Detach, End, Flow, Frame, LinkRole, Open,
14 Performative, Source, Target, Transfer,
15};
16use crate::message::Message;
17use crate::options::LinkOptions;
18use rand::Rng;
19use std::collections::hash_map::Entry;
20use std::collections::HashMap;
21use std::sync::atomic::{AtomicBool, AtomicU32, Ordering};
22use std::sync::{Arc, Mutex};
23use std::time::{Duration, Instant};
24use uuid::Uuid;
25
26pub type DeliveryTag = Vec<u8>;
27pub type HandleId = u32;
28
29#[derive(Debug)]
30pub struct ConnectionDriver {
31 channel_max: u16,
32 idle_timeout: Duration,
33 connection: ConnectionHandle,
34 sessions: Mutex<HashMap<ChannelId, Arc<SessionDriver>>>,
35
36 rx: Channel<AmqpFrame>,
38 remote_channel_map: Mutex<HashMap<ChannelId, ChannelId>>,
39 remote_idle_timeout: Duration,
40
41 closed: AtomicBool,
43}
44
45#[derive(Debug)]
46pub struct SessionDriver {
47 connection: ConnectionHandle,
49 local_channel: ChannelId,
50 rx: Channel<AmqpFrame>,
51
52 links_in_flight: Mutex<HashMap<String, Arc<LinkDriver>>>,
53 links: Mutex<HashMap<HandleId, Arc<LinkDriver>>>,
54 handle_generator: AtomicU32,
55
56 #[allow(clippy::type_complexity)]
57 did_to_delivery: Arc<Mutex<HashMap<u32, (HandleId, Arc<DeliveryDriver>)>>>,
58 initial_outgoing_id: u32,
59
60 flow_control: Arc<Mutex<SessionFlowControl>>,
61}
62
63#[derive(Clone, Debug)]
65struct SessionFlowControl {
66 next_outgoing_id: u32,
67 next_incoming_id: u32,
68
69 incoming_window: u32,
70 outgoing_window: u32,
71
72 remote_incoming_window: u32,
73 remote_outgoing_window: u32,
74}
75
76impl SessionFlowControl {
77 fn new() -> SessionFlowControl {
78 SessionFlowControl {
79 next_outgoing_id: 0,
80 next_incoming_id: 0,
81
82 incoming_window: std::i32::MAX as u32,
83 outgoing_window: std::i32::MAX as u32,
84
85 remote_incoming_window: 0,
86 remote_outgoing_window: 0,
87 }
88 }
89
90 fn accept(&mut self, delivery_id: u32) -> Result<bool> {
91 if delivery_id + 1 < self.next_incoming_id || self.remote_outgoing_window == 0 {
92 Err(AmqpError::framing_error(None))
93 } else if self.incoming_window == 0 {
94 Ok(false)
95 } else {
96 self.incoming_window -= 1;
97 self.next_incoming_id = delivery_id + 1;
98 self.remote_outgoing_window -= 1;
99 Ok(true)
100 }
101 }
102
103 fn next(&mut self) -> Option<SessionFlowControl> {
104 if self.outgoing_window > 0 && self.remote_incoming_window > 0 {
105 let original = self.clone();
106 self.next_outgoing_id += 1;
107 self.outgoing_window -= 1;
108 self.remote_incoming_window -= 1;
109 Some(original)
110 } else {
111 None
112 }
113 }
114}
115
116#[derive(Debug)]
117pub struct LinkDriver {
118 pub name: String,
119 pub handle: u32,
120 pub role: LinkRole,
121 pub channel: ChannelId,
122 connection: ConnectionHandle,
123 rx: Channel<AmqpFrame>,
124
125 session_flow_control: Arc<Mutex<SessionFlowControl>>,
126
127 #[allow(clippy::type_complexity)]
128 did_to_delivery: Arc<Mutex<HashMap<u32, (HandleId, Arc<DeliveryDriver>)>>>,
129 credit: AtomicU32,
130 delivery_count: AtomicU32,
131}
132
133#[derive(Debug)]
134pub struct DeliveryDriver {
135 pub message: Option<Message>,
136 pub remotely_settled: bool,
137 pub settled: bool,
138 pub state: Option<DeliveryState>,
139 pub tag: DeliveryTag,
140 pub id: u32,
141}
142
143pub struct SessionOpts {
144 pub max_frame_size: u32,
145}
146
147impl ConnectionDriver {
148 pub fn new(connection: ConnectionHandle, idle_timeout: Duration) -> ConnectionDriver {
149 ConnectionDriver {
150 connection,
151 rx: Channel::new(),
152 sessions: Mutex::new(HashMap::new()),
153 remote_channel_map: Mutex::new(HashMap::new()),
154 idle_timeout,
155 remote_idle_timeout: Duration::from_secs(0),
156 channel_max: u16::MAX,
157 closed: AtomicBool::new(false),
158 }
159 }
160
161 pub fn closed(&self) -> bool {
162 self.closed.load(Ordering::SeqCst)
163 }
164
165 pub fn connection(&self) -> &ConnectionHandle {
166 &self.connection
167 }
168
169 pub fn flowcontrol(&self) -> Result<()> {
170 let low_flow_watermark = 100;
171 let high_flow_watermark = 1000;
172
173 for (_, session) in self.sessions.lock().unwrap().iter_mut() {
174 for (_, link) in session.links.lock().unwrap().iter_mut() {
175 if link.role == LinkRole::Receiver {
176 let credit = link.credit.load(Ordering::SeqCst);
177 if credit <= low_flow_watermark {
178 link.flow(high_flow_watermark)?;
179 }
180 }
181 }
182 }
183 Ok(())
184 }
185
186 pub fn keepalive(&self) -> Result<()> {
187 let now = Instant::now();
188 let last_received = self.connection.keepalive(self.remote_idle_timeout, now)?;
189
190 if self.idle_timeout.as_millis() > 0 {
191 if now - last_received > self.idle_timeout * 2 {
193 self.connection.close(Close {
194 error: Some(ErrorCondition::local_idle_timeout()),
195 })?;
196 warn!("Connection timed out");
197 return Err(AmqpError::IoError(std::io::Error::from(
198 std::io::ErrorKind::TimedOut,
199 )));
200 }
201 }
202 Ok(())
203 }
204
205 pub fn open(&self, open: Open) -> Result<()> {
206 self.connection.open(open)
207 }
208
209 pub fn close(&self, error: Option<ErrorCondition>) -> Result<()> {
210 if self.closed.fetch_or(true, Ordering::SeqCst) {
211 return Ok(());
212 }
213
214 for (_id, session) in core::mem::take(&mut *self.sessions.lock().unwrap()) {
215 for (_id, link) in core::mem::take(&mut *session.links.lock().unwrap()) {
216 let _ = link.close(None);
217 link.rx.close();
218 }
219 let _ = session.close(None);
220 session.rx.close();
221 }
222
223 self.rx.close();
224 self.connection.close(Close { error })
225 }
226
227 pub(crate) fn dispatch(&self, frames: Vec<Frame>) -> Result<()> {
228 for frame in frames {
229 if let Frame::AMQP(frame) = frame {
230 trace!("Got AMQP frame: {:?}", frame.performative);
231 if let Some(ref performative) = frame.performative {
232 let channel = frame.channel;
233 match performative {
234 Performative::Open(ref _open) => {
235 self.rx.send(frame)?;
236 }
237 Performative::Close(ref _close) => {
238 self.rx.send(frame)?;
239 }
240 Performative::Begin(ref begin) => {
241 let m = self.sessions.lock().unwrap();
242 let s = m.get(&channel);
243 if let Some(s) = s {
244 {
245 let mut f = s.flow_control.lock().unwrap();
246 f.remote_outgoing_window = begin.outgoing_window;
247 f.remote_incoming_window = begin.incoming_window;
248 if let Some(remote_channel) = begin.remote_channel {
249 let mut cm = self.remote_channel_map.lock().unwrap();
250 cm.insert(channel, remote_channel);
251 }
252 }
253 s.rx.send(frame)?;
254 }
255 }
256 Performative::End(ref _end) => {
257 let local_channel: Option<ChannelId> = {
258 let cm = self.remote_channel_map.lock().unwrap();
259 cm.get(&channel).cloned()
260 };
261
262 if let Some(local_channel) = local_channel {
263 let mut m = self.sessions.lock().unwrap();
264 m.get_mut(&local_channel).map(|s| s.rx.send(frame));
265 }
266 }
267 _ => {
268 let local_channel: Option<ChannelId> = {
269 let cm = self.remote_channel_map.lock().unwrap();
270 cm.get(&channel).cloned()
271 };
272
273 if let Some(local_channel) = local_channel {
274 let session = {
275 let mut m = self.sessions.lock().unwrap();
276 m.get_mut(&local_channel).cloned()
277 };
278
279 if let Some(s) = session {
280 s.dispatch(frame)?;
281 }
282 }
283 }
284 }
285 }
286 }
287 }
288 Ok(())
289 }
290
291 fn allocate_session(&self) -> Option<Arc<SessionDriver>> {
292 let mut m = self.sessions.lock().unwrap();
293 for i in 0..self.channel_max {
294 let chan = i as ChannelId;
295 if let Entry::Vacant(entry) = m.entry(chan) {
296 let session = Arc::new(SessionDriver {
297 connection: self.connection.clone(),
298 local_channel: chan,
299 rx: Channel::new(),
300
301 links_in_flight: Mutex::new(HashMap::new()),
302 links: Mutex::new(HashMap::new()),
303 handle_generator: AtomicU32::new(0),
304
305 flow_control: Arc::new(Mutex::new(SessionFlowControl::new())),
306 initial_outgoing_id: 0,
307
308 did_to_delivery: Arc::new(Mutex::new(HashMap::new())),
309 });
310 entry.insert(session.clone());
311 return Some(session);
312 }
313 }
314 None
315 }
316
317 pub async fn new_session(&self, _opts: Option<SessionOpts>) -> Result<Arc<SessionDriver>> {
318 let session = self
319 .allocate_session()
320 .ok_or(AmqpError::SessionAllocationExhausted)?;
321 let flow_control: SessionFlowControl = { session.flow_control.lock().unwrap().clone() };
322 let begin = Begin {
323 remote_channel: None,
324 next_outgoing_id: flow_control.next_outgoing_id,
325 incoming_window: flow_control.incoming_window,
326 outgoing_window: flow_control.outgoing_window,
327 handle_max: None,
328 offered_capabilities: None,
329 desired_capabilities: None,
330 properties: None,
331 };
332 debug!(
333 "Creating session with local channel {}",
334 session.local_channel
335 );
336
337 self.connection.begin(session.local_channel, begin)?;
338 Ok(session)
339 }
340
341 #[inline]
342 pub async fn recv(&self) -> Result<AmqpFrame> {
343 self.rx.recv().await
344 }
345
346 #[inline]
347 pub fn unrecv(&self, frame: AmqpFrame) -> Result<()> {
348 warn!("unrecv: {:?}", frame);
349 self.rx.send(frame)
350 }
351}
352
353impl SessionDriver {
354 pub fn dispatch(&self, frame: AmqpFrame) -> Result<()> {
355 trace!("Dispatching frame: {:?}", frame);
356 match &frame.performative {
357 Some(Performative::Attach(attach_response)) => {
358 let link = self
359 .links_in_flight
360 .lock()
361 .unwrap()
362 .remove(&attach_response.name);
363
364 if let Some(link) = link {
365 let handle = attach_response.handle;
366 if link.rx.send(frame).is_ok() {
367 self.links.lock().unwrap().insert(handle, Arc::clone(&link));
368 } else {
369 error!("Failed to notify LinkDriver about attach frame")
370 }
371 } else {
372 error!(
373 "Received attach frame for unknown link: {:?}",
374 attach_response
375 );
376 }
377 }
378 Some(Performative::Detach(ref detach)) => {
379 if let Some(link) = self.links.lock().unwrap().remove(&detach.handle) {
380 link.rx.send(frame)?;
381 } else {
382 warn!("Detach request with unknown handle received: {:?}", detach)
383 }
384 }
385 Some(Performative::Transfer(ref transfer)) => {
386 if let Some(delivery_id) = transfer.delivery_id {
388 loop {
389 let result = self.flow_control.lock().unwrap().accept(delivery_id);
390 match result {
391 Err(AmqpError::Amqp(cond)) => {
392 error!("Transfer error: {:?}", cond);
393 self.close(Some(cond))?;
394 }
395 Err(e) => {
396 error!("Transfer error: {:?}", e);
397 self.close(None)?;
398 }
399 Ok(false) => {}
400 Ok(true) => break,
401 }
402 }
403 }
404
405 let link = {
406 let mut m = self.links.lock().unwrap();
407 m.get_mut(&transfer.handle).unwrap().clone()
408 };
409
410 let count_down = |x| {
411 if x == 0 {
412 Some(0)
413 } else {
414 Some(x - 1)
415 }
416 };
417 if link
419 .credit
420 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, count_down)
421 == Ok(0)
422 {
423 trace!("Transfer but no space left!");
424 } else {
425 trace!(
426 "Received transfer. Credit: {:?}",
427 link.credit.load(Ordering::SeqCst)
428 );
429 link.delivery_count.fetch_add(1, Ordering::SeqCst);
430 link.rx.send(frame)?;
431 }
432 }
433 Some(Performative::Disposition(ref disposition)) => {
434 trace!("Received disposition: {:?}", disposition);
435 let last = disposition.last.unwrap_or(disposition.first);
436 for id in disposition.first..=last {
437 if let Some((handle, _delivery)) =
438 self.did_to_delivery.lock().unwrap().remove(&id)
439 {
440 if let Some(link) = self.links.lock().unwrap().get(&handle).cloned() {
441 if link.role == disposition.role {
442 link.rx.send(frame.clone())?;
443 }
444 } else {
445 debug!("Disposition for invalid handle({}) received", handle);
446 }
447 }
448 }
449 }
450 Some(Performative::Flow(ref flow)) => {
451 trace!("Received flow!");
452 {
454 let mut control = self.flow_control.lock().unwrap();
455 control.next_incoming_id = flow.next_outgoing_id;
456 control.remote_outgoing_window = flow.outgoing_window;
457 if let Some(next_incoming_id) = flow.next_incoming_id {
458 control.remote_incoming_window =
459 next_incoming_id + flow.incoming_window - control.next_outgoing_id;
460 } else {
461 control.remote_incoming_window = self.initial_outgoing_id
462 + flow.incoming_window
463 - control.next_outgoing_id;
464 }
465 }
466 if let Some(handle) = flow.handle {
467 let link = {
468 let mut m = self.links.lock().unwrap();
469 m.get_mut(&handle).ok_or(AmqpError::InvalidHandle)?.clone()
470 };
471 if let Some(credit) = flow.link_credit {
472 let credit = flow.delivery_count.unwrap_or(0) + credit
473 - link.delivery_count.load(Ordering::SeqCst);
474 link.credit.store(credit, Ordering::SeqCst);
475 }
476 }
477 }
478 _ => {
479 warn!("Unexpected performative for session: {:?}", frame);
480 }
481 }
482 Ok(())
483 }
484
485 pub fn close(&self, error: Option<ErrorCondition>) -> Result<()> {
486 self.connection.end(self.local_channel, End { error })
487 }
488
489 pub async fn new_link(
490 &self,
491 address: &str,
492 options: impl Into<LinkOptions>,
493 ) -> Result<(String, Arc<LinkDriver>)> {
494 let options = options.into();
495 let role = options.role();
496 let link_name = format!("dove-{}-{}", Uuid::new_v4(), role.as_str());
497 debug!("Creating link {} with role {:?}", link_name, role);
498
499 let attach = Attach {
501 name: link_name.clone(),
502 handle: self.next_handle_id(),
503 role,
504 snd_settle_mode: None,
505 rcv_settle_mode: None,
506 source: Some(Source {
507 address: Some(address.to_string()),
508 durable: None,
509 expiry_policy: None,
510 timeout: None,
511 dynamic: Some(false),
512 dynamic_node_properties: None,
513 default_outcome: None,
514 distribution_mode: None,
515 filter: None,
516 outcomes: None,
517 capabilities: None,
518 }),
519 target: Some(Target {
520 address: Some(address.to_string()),
521 durable: None,
522 expiry_policy: None,
523 timeout: None,
524 dynamic: Some(false),
525 dynamic_node_properties: None,
526 capabilities: None,
527 }),
528 unsettled: None,
529 incomplete_unsettled: None,
530 initial_delivery_count: if role == LinkRole::Sender {
531 Some(0)
532 } else {
533 None
534 },
535 max_message_size: None,
536 offered_capabilities: None,
537 desired_capabilities: None,
538 properties: None,
539 };
540
541 let attach = options.applied_on_attach(attach);
542 let link = Arc::new(LinkDriver {
543 name: link_name.clone(),
544 role,
545 channel: self.local_channel,
546 connection: self.connection.clone(),
547 handle: attach.handle,
548 rx: Channel::new(),
549 session_flow_control: self.flow_control.clone(),
550 did_to_delivery: self.did_to_delivery.clone(),
551 credit: AtomicU32::new(0),
552 delivery_count: AtomicU32::new(0),
553 });
554
555 self.links_in_flight
556 .lock()
557 .unwrap()
558 .insert(link_name.clone(), Arc::clone(&link));
559
560 debug!("Requesting attachment of {}/{}", attach.name, attach.handle);
561 self.connection.attach(self.local_channel, attach)?;
562
563 let frame = link.rx.recv().await?;
564 if let Some(Performative::Attach(response)) = frame.performative {
565 debug!(
566 "Received response for attach request: handle={}",
567 response.handle
568 );
569
570 let requested_address = address;
572 let dynamic = matches!(options.dynamic(), Some(true));
573
574 let address_response = match response.role {
575 LinkRole::Sender => response.target.and_then(|t| t.address),
576 LinkRole::Receiver => response.source.and_then(|s| s.address),
577 };
578
579 match address_response {
580 Some(address) if dynamic || address == requested_address => Ok((address, link)),
581 invalid => {
582 warn!(
583 "Expected address {:?}, but server sent {:?}",
584 requested_address, invalid
585 );
586 link.close(Some(ErrorCondition {
587 condition: "amqp:invalid-field".to_string(),
588 description: format!(
589 "Expected address {:?}, but server sent {:?}",
590 requested_address, invalid
591 ),
592 }))?;
593 Err(AmqpError::TargetNotRecognized(
594 requested_address.to_string(),
595 ))
596 }
597 }
598 } else {
599 let condition = ErrorCondition {
600 condition: "amqp:precondition-failed".to_string(),
601 description: format!("Expected attach frame, but got {:?}", frame),
602 };
603 link.close(Some(condition.clone()))?;
604 Err(AmqpError::Amqp(condition))
605 }
606 }
607
608 fn next_handle_id(&self) -> HandleId {
609 loop {
610 let handle_id = self.handle_generator.fetch_add(1, Ordering::SeqCst);
611
612 loop {
613 let links = match self.links.try_lock() {
615 Ok(links) => links,
616 Err(_) => continue,
617 };
618
619 let links_in_flight = match self.links_in_flight.try_lock() {
621 Ok(links_in_flight) => links_in_flight,
622 Err(_) => continue,
623 };
624
625 if !links.values().any(|l| l.handle == handle_id)
626 && !links_in_flight.values().any(|l| l.handle == handle_id)
627 {
628 return handle_id;
629 } else {
630 break;
631 }
632 }
633 }
634 }
635
636 #[inline]
637 pub async fn recv(&self) -> Result<AmqpFrame> {
638 self.rx.recv().await
639 }
640
641 #[inline]
642 pub fn unrecv(&self, frame: AmqpFrame) -> Result<()> {
643 warn!("unrecv");
644 self.rx.send(frame)
645 }
646}
647
648impl LinkDriver {
649 pub fn connection(&self) -> &ConnectionHandle {
650 &self.connection
651 }
652
653 pub fn credits(&self) -> u32 {
654 self.credit.load(Ordering::SeqCst)
655 }
656
657 pub async fn send_message(
658 &self,
659 message: Message,
660 settled: bool,
661 ) -> Result<Arc<DeliveryDriver>> {
662 let semaphore_fn = |x| {
663 if x == 0 {
664 Some(0)
665 } else {
666 Some(x - 1)
667 }
668 };
669
670 if self
672 .credit
673 .fetch_update(Ordering::SeqCst, Ordering::SeqCst, semaphore_fn)
674 == Ok(0)
675 {
676 return Err(AmqpError::NotEnoughCreditsToSend(Box::new(message)));
677 }
678
679 let next_outgoing_id = loop {
681 let props = self.session_flow_control.lock().unwrap().next();
682 if let Some(props) = props {
683 break props.next_outgoing_id;
684 }
685 warn!("No next_outgoing_id, busy looping");
686 };
688
689 self.delivery_count.fetch_add(1, Ordering::SeqCst);
690 let delivery_tag = rand::thread_rng().gen::<[u8; 16]>().to_vec();
691 let delivery = Arc::new(DeliveryDriver {
692 message: Some(message),
693 id: next_outgoing_id,
694 tag: delivery_tag.clone(),
695 state: None,
696 remotely_settled: false,
697 settled,
698 });
699
700 if !settled {
701 self.did_to_delivery
702 .lock()
703 .unwrap()
704 .insert(next_outgoing_id, (self.handle, delivery.clone()));
705 }
706
707 let transfer = Transfer {
708 handle: self.handle,
709 delivery_id: Some(next_outgoing_id),
710 delivery_tag: Some(delivery_tag),
711 message_format: Some(0),
712 settled: Some(settled),
713 more: Some(false),
714 rcv_settle_mode: None,
715 state: None,
716 resume: None,
717 aborted: None,
718 batchable: None,
719 };
720
721 let mut msgbuf = Vec::new();
722 if let Some(message) = delivery.message.as_ref() {
723 message.encode(&mut msgbuf)?;
724 }
725
726 self.connection
727 .transfer(self.channel, transfer, Some(msgbuf))?;
728
729 Ok(delivery)
730 }
731
732 pub fn flow(&self, credit: u32) -> Result<()> {
733 trace!("{}: issuing {} credits", self.handle, credit);
734 self.credit.store(credit, Ordering::SeqCst);
735 let props = { self.session_flow_control.lock().unwrap().clone() };
736 self.connection.flow(
737 self.channel,
738 Flow {
739 next_incoming_id: Some(props.next_incoming_id),
740 incoming_window: props.incoming_window,
741 next_outgoing_id: props.next_outgoing_id,
742 outgoing_window: props.outgoing_window,
743 handle: Some(self.handle),
744 delivery_count: Some(self.delivery_count.load(Ordering::SeqCst)),
745 link_credit: Some(credit),
746 available: None,
747 drain: None,
748 echo: None,
749 properties: None,
750 },
751 )
752 }
753
754 pub fn close(&self, error: Option<ErrorCondition>) -> Result<()> {
755 self.connection.detach(
756 self.channel,
757 Detach {
758 handle: self.handle,
759 closed: Some(true),
760 error,
761 },
762 )
763 }
764
765 #[inline]
766 pub async fn recv(&self) -> Result<AmqpFrame> {
767 self.rx.recv().await
768 }
769
770 #[inline]
771 pub fn unrecv(&self, frame: AmqpFrame) -> Result<()> {
772 warn!("unrecv");
773 self.rx.send(frame)
774 }
775
776 #[inline]
777 pub fn disposition(
778 &self,
779 delivery: &DeliveryDriver,
780 settled: bool,
781 state: DeliveryState,
782 ) -> Result<()> {
783 if settled {
784 self.session_flow_control.lock().unwrap().incoming_window += 1;
785 }
786 self.connection().disposition(
787 self.channel,
788 framing::Disposition {
789 role: self.role,
790 first: delivery.id,
791 last: Some(delivery.id),
792 settled: Some(settled),
793 state: Some(state),
794 batchable: None,
795 },
796 )
797 }
798}
799
800#[derive(Debug)]
801pub struct Channel<T> {
802 tx: async_channel::Sender<T>,
803 rx: async_channel::Receiver<T>,
804}
805
806impl<T> Default for Channel<T> {
807 fn default() -> Self {
808 let (tx, rx) = async_channel::unbounded();
809 Self { tx, rx }
810 }
811}
812
813impl<T> Channel<T> {
814 pub fn new() -> Channel<T> {
815 Self::default()
816 }
817
818 #[inline]
819 pub fn send(&self, value: T) -> Result<()> {
820 Ok(self.tx.try_send(value)?)
821 }
822
823 #[inline]
824 pub fn try_recv(&self) -> Result<T> {
825 Ok(self.rx.try_recv()?)
826 }
827
828 #[inline]
829 pub async fn recv(&self) -> Result<T> {
830 Ok(self.rx.recv().await?)
831 }
832
833 pub fn close(&self) {
834 self.tx.close();
835 self.rx.close();
836 }
837
838 pub fn handle<H: From<async_channel::Sender<T>>>(&self) -> H {
839 H::from(self.tx.clone())
840 }
841
842 pub fn handle_with<P, H: From<(async_channel::Sender<T>, P)>>(&self, param: P) -> H {
843 H::from((self.tx.clone(), param))
844 }
845}
846
847#[cfg(test)]
848mod tests {
849 #[test]
850 fn check_handle_map() {}
851}