1use crate::factory::BootstrapReporter;
4use crate::mgr::state::{ChannelForTarget, PendingChannelHandle};
5use crate::{ChanProvenance, ChannelConfig, ChannelUsage, Dormancy, Error, Result};
6
7use async_trait::async_trait;
8use futures::future::Shared;
9use oneshot_fused_workaround as oneshot;
10use std::result::Result as StdResult;
11use std::sync::Arc;
12use std::time::Duration;
13use tor_error::{error_report, internal};
14use tor_linkspec::{HasChanMethod, HasRelayIds};
15use tor_netdir::params::NetParameters;
16use tor_proto::channel::kist::KistParams;
17use tor_proto::channel::params::ChannelPaddingInstructionsUpdates;
18use tor_proto::memquota::{ChannelAccount, SpecificAccount as _, ToplevelAccount};
19use tracing::{instrument, trace};
20
21#[cfg(feature = "relay")]
22use {safelog::Sensitive, std::net::SocketAddr, tor_proto::RelayChannelAuthMaterial};
23
24mod select;
25mod state;
26
27pub(crate) trait AbstractChannel: HasRelayIds {
31 fn is_canonical(&self) -> bool;
33 fn is_canonical_to_peer(&self) -> bool;
35 fn is_usable(&self) -> bool;
41 fn duration_unused(&self) -> Option<Duration>;
44
45 fn reparameterize(
50 &self,
51 updates: Arc<ChannelPaddingInstructionsUpdates>,
52 ) -> tor_proto::Result<()>;
53
54 fn reparameterize_kist(&self, kist_params: KistParams) -> tor_proto::Result<()>;
59
60 fn engage_padding_activities(&self);
66}
67
68#[async_trait]
74pub(crate) trait AbstractChannelFactory {
75 type Channel: AbstractChannel;
77 type BuildSpec: HasRelayIds + HasChanMethod;
79 type Stream;
81
82 async fn build_channel(
89 &self,
90 target: &Self::BuildSpec,
91 reporter: BootstrapReporter,
92 memquota: ChannelAccount,
93 ) -> Result<Arc<Self::Channel>>;
94
95 #[cfg(feature = "relay")]
97 async fn build_channel_using_incoming(
98 &self,
99 peer: Sensitive<std::net::SocketAddr>,
100 stream: Self::Stream,
101 memquota: ChannelAccount,
102 ) -> Result<Arc<Self::Channel>>;
103}
104
105#[derive(Default)]
107pub struct ChanMgrConfig {
108 pub(crate) cfg: ChannelConfig,
110 #[cfg(feature = "relay")]
112 pub(crate) auth_material: Option<Arc<RelayChannelAuthMaterial>>,
113 #[cfg(feature = "relay")]
116 pub(crate) my_addrs: Vec<SocketAddr>,
117 }
119
120impl ChanMgrConfig {
121 pub fn new(cfg: ChannelConfig) -> Self {
123 Self {
124 cfg,
125 #[cfg(feature = "relay")]
126 auth_material: None,
127 #[cfg(feature = "relay")]
128 my_addrs: Vec::new(),
129 }
130 }
131
132 #[cfg(feature = "relay")]
134 pub fn with_auth_material(mut self, auth_material: Arc<RelayChannelAuthMaterial>) -> Self {
135 self.auth_material = Some(auth_material);
136 self
137 }
138
139 #[cfg(feature = "relay")]
141 pub fn with_my_addrs(mut self, my_addrs: Vec<SocketAddr>) -> Self {
142 self.my_addrs = my_addrs;
143 self
144 }
145}
146
147pub(crate) struct AbstractChanMgr<CF: AbstractChannelFactory> {
156 pub(crate) channels: state::MgrState<CF>,
161
162 pub(crate) reporter: BootstrapReporter,
164
165 pub(crate) memquota: ToplevelAccount,
167}
168
169type Pending = Shared<oneshot::Receiver<Result<()>>>;
172
173type Sending = oneshot::Sender<Result<()>>;
176
177struct PendingLaunchGuard<'a, CF: AbstractChannelFactory> {
184 channels: &'a state::MgrState<CF>,
186 handle: Option<PendingChannelHandle>,
188 send: Option<Sending>,
190 result: Result<()>,
192}
193
194impl<'a, CF: AbstractChannelFactory> PendingLaunchGuard<'a, CF> {
195 fn new(channels: &'a state::MgrState<CF>, handle: PendingChannelHandle, send: Sending) -> Self {
197 Self {
198 channels,
199 handle: Some(handle),
200 send: Some(send),
201 result: Err(Error::RequestCancelled),
202 }
203 }
204
205 fn note_result(&mut self, result: Result<()>) {
207 self.result = result;
208 }
209
210 fn upgrade_pending_channel_to_open(&mut self, channel: Arc<CF::Channel>) -> Result<()> {
212 let handle = self
213 .handle
214 .take()
215 .expect("pending launch guard lost its handle before upgrade");
216 self.channels
217 .upgrade_pending_channel_to_open(handle, channel)
218 }
219}
220
221impl<'a, CF: AbstractChannelFactory> Drop for PendingLaunchGuard<'a, CF> {
222 fn drop(&mut self) {
223 if let Some(handle) = self.handle.take() {
224 if let Err(e) = self.channels.remove_pending_channel(handle) {
225 #[allow(clippy::missing_docs_in_private_items)]
230 const MSG: &str = "Unable to remove the pending channel";
231 error_report!(internal!("{e}"), "{}", MSG);
232 }
233 }
234
235 if let Some(send) = self.send.take() {
236 let _ignore_err = send.send(self.result.clone());
239 }
240 }
241}
242
243impl<CF: AbstractChannelFactory + Clone> AbstractChanMgr<CF> {
244 pub(crate) fn new(
246 connector: CF,
247 config: ChannelConfig,
248 dormancy: Dormancy,
249 netparams: &NetParameters,
250 reporter: BootstrapReporter,
251 memquota: ToplevelAccount,
252 ) -> Self {
253 AbstractChanMgr {
254 channels: state::MgrState::new(connector, config, dormancy, netparams),
255 reporter,
256 memquota,
257 }
258 }
259
260 #[allow(unused)]
262 pub(crate) fn with_mut_builder<F>(&self, func: F)
263 where
264 F: FnOnce(&mut CF),
265 {
266 self.channels.with_mut_builder(func);
267 }
268
269 #[cfg(test)]
271 pub(crate) fn remove_unusable_entries(&self) -> Result<()> {
272 self.channels.remove_unusable()
273 }
274
275 #[cfg(feature = "relay")]
278 pub(crate) async fn handle_incoming(
279 &self,
280 src: Sensitive<std::net::SocketAddr>,
281 stream: CF::Stream,
282 ) -> Result<Arc<CF::Channel>> {
283 let chan_builder = self.channels.builder();
284 let memquota = ChannelAccount::new(&self.memquota)?;
285 let channel = chan_builder
286 .build_channel_using_incoming(src, stream, memquota)
287 .await?;
288 self.channels.add_open(channel.clone())?;
290 Ok(channel)
291 }
292
293 #[instrument(skip_all, level = "trace")]
303 pub(crate) async fn get_or_launch(
304 &self,
305 target: CF::BuildSpec,
306 usage: ChannelUsage,
307 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
308 use ChannelUsage as CU;
309
310 let chan = self.get_or_launch_internal(target).await?;
311
312 match usage {
313 CU::Dir | CU::UselessCircuit => {}
314 CU::UserTraffic => chan.0.engage_padding_activities(),
315 }
316
317 Ok(chan)
318 }
319
320 #[allow(clippy::cognitive_complexity)]
322 #[instrument(skip_all, level = "trace")]
323 async fn get_or_launch_internal(
324 &self,
325 target: CF::BuildSpec,
326 ) -> Result<(Arc<CF::Channel>, ChanProvenance)> {
327 const N_ATTEMPTS: usize = 2;
329 let mut attempts_so_far = 0;
330 let mut final_attempt = false;
331 let mut provenance = ChanProvenance::Preexisting;
332
333 let mut last_err = None;
335
336 while attempts_so_far < N_ATTEMPTS || final_attempt {
337 attempts_so_far += 1;
338
339 let action = self.choose_action(&target, final_attempt)?;
344
345 match action {
348 None => {
351 if !final_attempt {
352 return Err(Error::Internal(internal!(
353 "No action returned while not on final attempt"
354 )));
355 }
356 break;
357 }
358 Some(Action::Return(v)) => {
360 trace!("Returning existing channel");
361 return v.map(|chan| (chan, provenance));
362 }
363 Some(Action::Wait(pend)) => {
365 trace!("Waiting for in-progress channel");
366 match pend.await {
367 Ok(Ok(())) => {
368 final_attempt = true;
374 provenance = ChanProvenance::NewlyCreated;
375 last_err.get_or_insert(Error::RequestCancelled);
376 }
377 Ok(Err(e)) => {
378 last_err = Some(e);
379 }
380 Err(_) => {
381 last_err =
382 Some(Error::Internal(internal!("channel build task disappeared")));
383 }
384 }
385 }
386 Some(Action::Launch((handle, send))) => {
388 trace!("Launching channel");
389 let connector = self.channels.builder();
390 let mut launch = PendingLaunchGuard::new(&self.channels, handle, send);
391 let memquota = match ChannelAccount::new(&self.memquota) {
392 Ok(memquota) => memquota,
393 Err(e) => {
394 let e: Error = e.into();
395 launch.note_result(Err(e.clone()));
396 return Err(e);
397 }
398 };
399
400 let outcome = connector
401 .build_channel(&target, self.reporter.clone(), memquota)
402 .await;
403
404 match outcome {
405 Ok(ref chan) => {
406 match launch.upgrade_pending_channel_to_open(Arc::clone(chan)) {
408 Ok(()) => launch.note_result(Ok(())),
409 Err(e) => {
410 launch.note_result(Err(e.clone()));
411 return Err(e);
412 }
413 }
414 }
415 Err(_) => {
416 launch.note_result(outcome.clone().map(|_| ()));
417 }
418 }
419
420 match outcome {
421 Ok(chan) => {
422 return Ok((chan, ChanProvenance::NewlyCreated));
423 }
424 Err(e) => last_err = Some(e),
425 }
426 }
427 }
428
429 }
431
432 Err(last_err.unwrap_or_else(|| Error::Internal(internal!("no error was set!?"))))
433 }
434
435 #[instrument(skip_all, level = "trace")]
444 fn choose_action(
445 &self,
446 target: &CF::BuildSpec,
447 final_attempt: bool,
448 ) -> Result<Option<Action<CF::Channel>>> {
449 let response = self.channels.request_channel(
451 target,
452 !final_attempt,
453 );
454
455 match response {
456 Ok(Some(ChannelForTarget::Open(channel))) => Ok(Some(Action::Return(Ok(channel)))),
457 Ok(Some(ChannelForTarget::Pending(pending))) => {
458 if !final_attempt {
459 Ok(Some(Action::Wait(pending)))
460 } else {
461 Ok(None)
463 }
464 }
465 Ok(Some(ChannelForTarget::NewEntry((handle, send)))) => {
466 Ok(Some(Action::Launch((handle, send))))
468 }
469 Ok(None) => Ok(None),
470 Err(e @ Error::IdentityConflict) => Ok(Some(Action::Return(Err(e)))),
471 Err(e) => Err(e),
472 }
473 }
474
475 pub(crate) fn update_netparams(
477 &self,
478 netparams: Arc<dyn AsRef<NetParameters>>,
479 ) -> StdResult<(), tor_error::Bug> {
480 self.channels.reconfigure_general(None, None, netparams)
481 }
482
483 pub(crate) fn set_dormancy(
485 &self,
486 dormancy: Dormancy,
487 netparams: Arc<dyn AsRef<NetParameters>>,
488 ) -> StdResult<(), tor_error::Bug> {
489 self.channels
490 .reconfigure_general(None, Some(dormancy), netparams)
491 }
492
493 pub(crate) fn reconfigure(
495 &self,
496 config: &ChannelConfig,
497 netparams: Arc<dyn AsRef<NetParameters>>,
498 ) -> StdResult<(), tor_error::Bug> {
499 self.channels
500 .reconfigure_general(Some(config), None, netparams)
501 }
502
503 pub(crate) fn expire_channels(&self) -> Duration {
512 self.channels.expire_channels()
513 }
514
515 #[cfg(test)]
517 pub(crate) fn get_nowait<'a, T>(&self, ident: T) -> Vec<Arc<CF::Channel>>
518 where
519 T: Into<tor_linkspec::RelayIdRef<'a>>,
520 {
521 use state::ChannelState::*;
522 self.channels
523 .with_channels(|channel_map| {
524 channel_map
525 .by_id(ident)
526 .filter_map(|entry| match entry {
527 Open(ent) if ent.channel.is_usable() => Some(Arc::clone(&ent.channel)),
528 _ => None,
529 })
530 .collect()
531 })
532 .expect("Poisoned lock")
533 }
534}
535
536#[allow(clippy::large_enum_variant)]
538enum Action<C: AbstractChannel> {
539 Launch((PendingChannelHandle, Sending)),
542 Wait(Pending),
545 Return(Result<Arc<C>>),
547}
548
549#[cfg(test)]
550mod test {
551 #![allow(clippy::bool_assert_comparison)]
553 #![allow(clippy::clone_on_copy)]
554 #![allow(clippy::dbg_macro)]
555 #![allow(clippy::mixed_attributes_style)]
556 #![allow(clippy::print_stderr)]
557 #![allow(clippy::print_stdout)]
558 #![allow(clippy::single_char_pattern)]
559 #![allow(clippy::unwrap_used)]
560 #![allow(clippy::unchecked_time_subtraction)]
561 #![allow(clippy::useless_vec)]
562 #![allow(clippy::needless_pass_by_value)]
563 use super::*;
565 use crate::Error;
566
567 use futures::{join, poll};
568 use std::error::Error as StdError;
569 use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
570 use std::sync::Arc;
571 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
572 use std::time::Duration;
573 use tor_error::bad_api_usage;
574 use tor_linkspec::ChannelMethod;
575 use tor_llcrypto::pk::ed25519::Ed25519Identity;
576 use tor_memquota::ArcMemoryQuotaTrackerExt as _;
577
578 use crate::ChannelUsage as CU;
579 use tor_rtcompat::{Runtime, task::yield_now, test_with_one_runtime};
580
581 const ADDR_A: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(1, 1, 1, 1), 443));
583 const ADDR_B: SocketAddr = SocketAddr::V4(SocketAddrV4::new(Ipv4Addr::new(2, 2, 2, 2), 443));
584
585 #[derive(Clone)]
586 struct FakeChannelFactory<RT> {
587 runtime: RT,
588 build_attempts: Arc<AtomicUsize>,
589 }
590
591 #[derive(Clone, Debug)]
592 struct FakeChannel {
593 ed_ident: Ed25519Identity,
594 mood: char,
595 closing: Arc<AtomicBool>,
596 detect_reuse: Arc<char>,
597 }
599
600 impl PartialEq for FakeChannel {
601 fn eq(&self, other: &Self) -> bool {
602 Arc::ptr_eq(&self.detect_reuse, &other.detect_reuse)
603 }
604 }
605
606 impl AbstractChannel for FakeChannel {
607 fn is_canonical(&self) -> bool {
608 unimplemented!()
609 }
610 fn is_canonical_to_peer(&self) -> bool {
611 unimplemented!()
612 }
613 fn is_usable(&self) -> bool {
614 !self.closing.load(Ordering::SeqCst)
615 }
616 fn duration_unused(&self) -> Option<Duration> {
617 None
618 }
619 fn reparameterize(
620 &self,
621 _updates: Arc<ChannelPaddingInstructionsUpdates>,
622 ) -> tor_proto::Result<()> {
623 match self.mood {
625 'r' => Err(tor_proto::Error::ChanProto(
627 "synthetic reparameterize failure".into(),
628 )),
629 _ => Ok(()),
630 }
631 }
632 fn reparameterize_kist(&self, _kist_params: KistParams) -> tor_proto::Result<()> {
633 Ok(())
634 }
635 fn engage_padding_activities(&self) {}
636 }
637
638 impl HasRelayIds for FakeChannel {
639 fn identity(
640 &self,
641 key_type: tor_linkspec::RelayIdType,
642 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
643 match key_type {
644 tor_linkspec::RelayIdType::Ed25519 => Some((&self.ed_ident).into()),
645 _ => None,
646 }
647 }
648 }
649
650 impl FakeChannel {
651 fn start_closing(&self) {
652 self.closing.store(true, Ordering::SeqCst);
653 }
654 }
655
656 impl<RT: Runtime> FakeChannelFactory<RT> {
657 fn new(runtime: RT, build_attempts: Arc<AtomicUsize>) -> Self {
658 FakeChannelFactory {
659 runtime,
660 build_attempts,
661 }
662 }
663 }
664
665 fn new_test_abstract_chanmgr<R: Runtime>(runtime: R) -> AbstractChanMgr<FakeChannelFactory<R>> {
666 new_test_abstract_chanmgr_and_build_attempts(runtime).0
667 }
668
669 fn new_test_abstract_chanmgr_and_build_attempts<R: Runtime>(
670 runtime: R,
671 ) -> (AbstractChanMgr<FakeChannelFactory<R>>, Arc<AtomicUsize>) {
672 let build_attempts = Arc::new(AtomicUsize::new(0));
673 let cf = FakeChannelFactory::new(runtime, Arc::clone(&build_attempts));
674 let mgr = AbstractChanMgr::new(
675 cf,
676 Default::default(),
677 Default::default(),
678 &Default::default(),
679 BootstrapReporter::fake(),
680 ToplevelAccount::new_noop(),
681 );
682 (mgr, build_attempts)
683 }
684
685 #[derive(Clone, Debug)]
686 struct FakeBuildSpec(u32, char, Ed25519Identity, SocketAddr);
687
688 impl HasRelayIds for FakeBuildSpec {
689 fn identity(
690 &self,
691 key_type: tor_linkspec::RelayIdType,
692 ) -> Option<tor_linkspec::RelayIdRef<'_>> {
693 match key_type {
694 tor_linkspec::RelayIdType::Ed25519 => Some((&self.2).into()),
695 _ => None,
696 }
697 }
698 }
699
700 impl HasChanMethod for FakeBuildSpec {
701 fn chan_method(&self) -> ChannelMethod {
702 ChannelMethod::Direct(vec![self.3.clone()])
703 }
704 }
705
706 fn u32_to_ed(n: u32) -> Ed25519Identity {
708 let mut bytes = [0; 32];
709 bytes[0..4].copy_from_slice(&n.to_be_bytes());
710 bytes.into()
711 }
712
713 fn error_contains(err: &Error, needle: &str) -> bool {
715 let mut source: Option<&(dyn StdError + 'static)> = Some(err);
716 while let Some(err) = source {
717 if err.to_string().contains(needle) || format!("{err:?}").contains(needle) {
718 return true;
719 }
720 source = err.source();
721 }
722 false
723 }
724
725 #[async_trait]
726 impl<RT: Runtime> AbstractChannelFactory for FakeChannelFactory<RT> {
727 type Channel = FakeChannel;
728 type BuildSpec = FakeBuildSpec;
729 type Stream = ();
730
731 async fn build_channel(
732 &self,
733 target: &Self::BuildSpec,
734 _reporter: BootstrapReporter,
735 _memquota: ChannelAccount,
736 ) -> Result<Arc<FakeChannel>> {
737 self.build_attempts.fetch_add(1, Ordering::SeqCst);
738 yield_now().await;
739 let FakeBuildSpec(ident, mood, id, _addr) = *target;
740 let ed_ident = u32_to_ed(ident);
741 assert_eq!(ed_ident, id);
742 match mood {
743 '❌' | '🔥' => return Err(Error::UnusableTarget(bad_api_usage!("emoji"))),
745 '💤' => {
747 self.runtime.sleep(Duration::new(15, 0)).await;
748 }
749 _ => {}
750 }
751 Ok(Arc::new(FakeChannel {
752 ed_ident,
753 mood,
754 closing: Arc::new(AtomicBool::new(false)),
755 detect_reuse: Default::default(),
756 }))
758 }
759
760 #[cfg(feature = "relay")]
761 async fn build_channel_using_incoming(
762 &self,
763 _peer: Sensitive<std::net::SocketAddr>,
764 _stream: Self::Stream,
765 _memquota: ChannelAccount,
766 ) -> Result<Arc<Self::Channel>> {
767 unimplemented!()
768 }
769 }
770
771 #[test]
772 fn connect_one_ok() {
773 test_with_one_runtime!(|runtime| async {
774 let mgr = new_test_abstract_chanmgr(runtime);
775 let target = FakeBuildSpec(413, '!', u32_to_ed(413), ADDR_A);
776 let chan1 = mgr
777 .get_or_launch(target.clone(), CU::UserTraffic)
778 .await
779 .unwrap()
780 .0;
781 let chan2 = mgr.get_or_launch(target, CU::UserTraffic).await.unwrap().0;
782
783 assert_eq!(chan1, chan2);
784 assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
785 });
786 }
787
788 #[test]
789 fn connect_one_fail() {
790 test_with_one_runtime!(|runtime| async {
791 let mgr = new_test_abstract_chanmgr(runtime);
792
793 let target = FakeBuildSpec(999, '❌', u32_to_ed(999), ADDR_A);
795 let res1 = mgr.get_or_launch(target, CU::UserTraffic).await;
796 assert!(matches!(res1, Err(Error::UnusableTarget(_))));
797
798 assert!(mgr.get_nowait(&u32_to_ed(999)).is_empty());
799 });
800 }
801
802 #[test]
803 fn connect_different_address() {
804 test_with_one_runtime!(|runtime| async {
805 let mgr = new_test_abstract_chanmgr(runtime);
806
807 let target1 = FakeBuildSpec(413, '!', u32_to_ed(413), ADDR_A);
809 let mut target2 = target1.clone();
810 target2.3 = ADDR_B;
811
812 let chan1 = mgr.get_or_launch(target1, CU::UserTraffic).await.unwrap().0;
813 let chan2 = mgr.get_or_launch(target2, CU::UserTraffic).await.unwrap().0;
814
815 assert_eq!(chan1, chan2);
817 assert_eq!(mgr.get_nowait(&u32_to_ed(413)), vec![chan1]);
818 });
819 }
820
821 #[test]
822 fn test_concurrent() {
823 test_with_one_runtime!(|runtime| async {
824 let mgr = new_test_abstract_chanmgr(runtime);
825
826 let usage = CU::UserTraffic;
827
828 let (ch3a, ch3b, ch44a, ch44b, ch50a, ch50b, ch86a, ch86b) = join!(
832 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3), ADDR_A), usage),
833 mgr.get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3), ADDR_A), usage),
834 mgr.get_or_launch(FakeBuildSpec(44, 'a', u32_to_ed(44), ADDR_A), usage),
835 mgr.get_or_launch(FakeBuildSpec(44, 'b', u32_to_ed(44), ADDR_A), usage),
836 mgr.get_or_launch(FakeBuildSpec(50, 'a', u32_to_ed(50), ADDR_A), usage),
837 mgr.get_or_launch(FakeBuildSpec(50, 'b', u32_to_ed(50), ADDR_B), usage),
838 mgr.get_or_launch(FakeBuildSpec(86, '❌', u32_to_ed(86), ADDR_A), usage),
839 mgr.get_or_launch(FakeBuildSpec(86, '🔥', u32_to_ed(86), ADDR_A), usage),
840 );
841 let ch3a = ch3a.unwrap();
842 let ch3b = ch3b.unwrap();
843 let ch44a = ch44a.unwrap();
844 let ch44b = ch44b.unwrap();
845 let ch50a = ch50a.unwrap();
846 let ch50b = ch50b.unwrap();
847 let err_a = ch86a.unwrap_err();
848 let err_b = ch86b.unwrap_err();
849
850 assert_eq!(ch3a, ch3b);
851 assert_eq!(ch44a, ch44b);
852 assert_eq!(ch50a, ch50b);
853 assert_ne!(ch44a, ch3a);
854
855 assert!(matches!(err_a, Error::UnusableTarget(_)));
856 assert!(matches!(err_b, Error::UnusableTarget(_)));
857 });
858 }
859
860 #[test]
861 fn dropped_launch_reports_request_cancelled_to_waiters() {
862 test_with_one_runtime!(|runtime| async {
863 let mgr = new_test_abstract_chanmgr(runtime);
864 let target = FakeBuildSpec(777, '💤', u32_to_ed(777), ADDR_A);
865 let usage = CU::UserTraffic;
866
867 let mut owner1 = Box::pin(mgr.get_or_launch(target.clone(), usage));
868 assert!(poll!(&mut owner1).is_pending());
869
870 let mut waiter = Box::pin(mgr.get_or_launch(target.clone(), usage));
871 assert!(poll!(&mut waiter).is_pending());
872
873 drop(owner1);
874
875 let mut owner2 = Box::pin(mgr.get_or_launch(target, usage));
876 assert!(poll!(&mut owner2).is_pending());
877
878 assert!(poll!(&mut waiter).is_pending());
879
880 drop(owner2);
881
882 let waiter = waiter.await;
883 assert!(
884 matches!(&waiter, Err(Error::RequestCancelled)),
885 "{waiter:?}"
886 );
887 if let Err(ref err) = waiter {
888 assert!(!error_contains(err, "channel build task disappeared"));
889 }
890 });
891 }
892
893 #[test]
894 fn failed_upgrade_reports_original_error_without_owner_retry() {
895 test_with_one_runtime!(|runtime| async {
896 let (mgr, build_attempts) = new_test_abstract_chanmgr_and_build_attempts(runtime);
897 let target = FakeBuildSpec(778, 'r', u32_to_ed(778), ADDR_A);
898 let usage = CU::UserTraffic;
899
900 let mut owner = Box::pin(mgr.get_or_launch(target.clone(), usage));
901 assert!(poll!(&mut owner).is_pending());
902
903 let mut waiter = Box::pin(mgr.get_or_launch(target.clone(), usage));
904 assert!(poll!(&mut waiter).is_pending());
905
906 let owner = owner.await;
907 assert!(matches!(&owner, Err(Error::Internal(_))), "{owner:?}");
908 if let Err(ref err) = owner {
909 assert!(error_contains(err, "failure on new channel"));
910 assert!(!error_contains(err, "channel build task disappeared"));
911 }
912
913 assert_eq!(build_attempts.load(Ordering::SeqCst), 1);
914 assert!(mgr.get_nowait(&u32_to_ed(778)).is_empty());
915
916 let waiter = waiter.await;
917 assert!(matches!(&waiter, Err(Error::Internal(_))), "{waiter:?}");
918 if let Err(ref err) = waiter {
919 assert!(error_contains(err, "failure on new channel"));
920 assert!(!error_contains(err, "channel build task disappeared"));
921 }
922 });
923 }
924
925 #[test]
926 fn unusable_entries() {
927 test_with_one_runtime!(|runtime| async {
928 let mgr = new_test_abstract_chanmgr(runtime);
929
930 let (ch3, ch4, ch5) = join!(
931 mgr.get_or_launch(FakeBuildSpec(3, 'a', u32_to_ed(3), ADDR_A), CU::UserTraffic),
932 mgr.get_or_launch(FakeBuildSpec(4, 'a', u32_to_ed(4), ADDR_A), CU::UserTraffic),
933 mgr.get_or_launch(FakeBuildSpec(5, 'a', u32_to_ed(5), ADDR_A), CU::UserTraffic),
934 );
935
936 let ch3 = ch3.unwrap().0;
937 let _ch4 = ch4.unwrap();
938 let ch5 = ch5.unwrap().0;
939
940 ch3.start_closing();
941 ch5.start_closing();
942
943 let ch3_new = mgr
944 .get_or_launch(FakeBuildSpec(3, 'b', u32_to_ed(3), ADDR_A), CU::UserTraffic)
945 .await
946 .unwrap()
947 .0;
948 assert_ne!(ch3, ch3_new);
949 assert_eq!(ch3_new.mood, 'b');
950
951 mgr.remove_unusable_entries().unwrap();
952
953 assert!(!mgr.get_nowait(&u32_to_ed(3)).is_empty());
954 assert!(!mgr.get_nowait(&u32_to_ed(4)).is_empty());
955 assert!(mgr.get_nowait(&u32_to_ed(5)).is_empty());
956 });
957 }
958}