1use std::fmt;
2use std::time::{Duration, Instant};
3
4use crossbeam_channel::*;
5
6pub struct Sender<T> {
62 pub(super) _liveness_check: crossbeam_channel::Sender<()>,
63 pub(super) sender: crossbeam_channel::Sender<T>,
64 pub(super) liveness_check: crossbeam_channel::Receiver<()>,
65 pub(super) depends_on: Option<(
66 crossbeam_channel::Receiver<()>,
67 crossbeam_channel::Receiver<()>,
68 )>,
69}
70
71impl<T> Sender<T> {
72 pub fn send(&self, msg: T) -> Result<(), SendError<T>> {
102 if let Some(dependency) = &self.depends_on {
103 select_biased! {
104 recv(dependency.0) -> _ => {
105 Err(SendError(msg))
106 },
107 recv(dependency.1) -> _ => {
108 Err(SendError(msg))
109 },
110 send(self.sender, msg) -> e => {
111 e
112 }
113 }
114 } else {
115 self.sender.send(msg)
116 }
117 }
118
119 pub fn try_send(&self, msg: T) -> Result<(), TrySendError<T>> {
142 if let Some(dependency) = &self.depends_on {
143 select_biased! {
144 recv(dependency.0) -> _ => {
145 Err(TrySendError::Disconnected(msg))
146 },
147 recv(dependency.1) -> _ => {
148 Err(TrySendError::Disconnected(msg))
149 },
150 default() => self.sender.try_send(msg)
151 }
152 } else {
153 self.sender.try_send(msg)
154 }
155 }
156
157 pub fn send_timeout(&self, msg: T, timeout: Duration) -> Result<(), SendTimeoutError<T>> {
196 if let Some(dependency) = &self.depends_on {
197 select_biased! {
198 recv(dependency.0) -> _ => {
199 Err(SendTimeoutError::Disconnected(msg))
200 },
201 recv(dependency.1) -> _ => {
202 Err(SendTimeoutError::Disconnected(msg))
203 },
204 send(self.sender, msg) -> res => {
205 res.map_err(|e| SendTimeoutError::Disconnected(e.into_inner()))
206 },
207 default(timeout) => Err(SendTimeoutError::Timeout(msg))
208 }
209 } else {
210 self.sender.send_timeout(msg, timeout)
211 }
212 }
213
214 pub fn send_deadline(&self, msg: T, deadline: Instant) -> Result<(), SendTimeoutError<T>> {
255 if let Some(dependency) = &self.depends_on {
256 select_biased! {
257 recv(dependency.0) -> _ => {
258 Err(SendTimeoutError::Disconnected(msg))
259 },
260 recv(dependency.1) -> _ => {
261 Err(SendTimeoutError::Disconnected(msg))
262 },
263 send(self.sender, msg) -> res => {
264 res.map_err(|e| SendTimeoutError::Disconnected(e.into_inner()))
265 },
266 default(deadline.saturating_duration_since(Instant::now())) => Err(SendTimeoutError::Timeout(msg))
267 }
268 } else {
269 self.sender.send_deadline(msg, deadline)
270 }
271 }
272
273 pub fn is_empty(&self) -> bool {
289 self.sender.is_empty()
290 }
291
292 pub fn is_full(&self) -> bool {
308 self.sender.is_full()
309 }
310
311 pub fn len(&self) -> usize {
326 self.sender.len()
327 }
328
329 pub fn capacity(&self) -> Option<usize> {
346 self.sender.capacity()
347 }
348
349 pub fn same_channel(&self, other: &Sender<T>) -> bool {
365 self.sender.same_channel(&other.sender)
366 }
367}
368
369impl<T> Clone for Sender<T> {
370 fn clone(&self) -> Self {
371 Sender {
372 _liveness_check: self._liveness_check.clone(),
373 sender: self.sender.clone(),
374 liveness_check: self.liveness_check.clone(),
375 depends_on: self.depends_on.clone(),
376 }
377 }
378}
379
380impl<T> fmt::Debug for Sender<T> {
381 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
382 f.pad("Lamba Channel Sender { .. }")
383 }
384}
385
386pub struct Receiver<T> {
443 pub(super) _liveness_check: crossbeam_channel::Sender<()>,
444 pub(super) receiver: crossbeam_channel::Receiver<T>,
445 pub(super) liveness_check: crossbeam_channel::Receiver<()>,
446 pub(super) depends_on: Option<(
447 crossbeam_channel::Receiver<()>,
448 crossbeam_channel::Receiver<()>,
449 )>,
450}
451
452impl<T> Receiver<T> {
453 pub fn recv(&self) -> Result<T, RecvError> {
483 if let Some(dependency) = &self.depends_on {
484 select_biased! {
485 recv(self.receiver) -> e => {
486 e
487 },
488 recv(dependency.0) -> _ => {
489 Err(RecvError)
490 },
491 recv(dependency.1) -> _ => {
492 Err(RecvError)
493 },
494 }
495 } else {
496 self.receiver.recv()
497 }
498 }
499
500 pub fn try_recv(&self) -> Result<T, TryRecvError> {
524 if let Some(dependency) = &self.depends_on {
525 select_biased! {
526 recv(dependency.0) -> _ => {
527 self.receiver.try_recv().map_err(|_| TryRecvError::Disconnected)
528 },
529 recv(dependency.1) -> _ => {
530 self.receiver.try_recv().map_err(|_| TryRecvError::Disconnected)
531 },
532 default() => self.receiver.try_recv()
533 }
534 } else {
535 self.receiver.try_recv()
536 }
537 }
538
539 pub fn recv_timeout(&self, timeout: Duration) -> Result<T, RecvTimeoutError> {
578 if let Some(dependency) = &self.depends_on {
579 select_biased! {
580 recv(self.receiver) -> res => {
581 res.map_err(|_| RecvTimeoutError::Disconnected)
582 },
583 recv(dependency.0) -> _ => {
584 Err(RecvTimeoutError::Disconnected)
585 },
586 recv(dependency.1) -> _ => {
587 Err(RecvTimeoutError::Disconnected)
588 },
589 default(timeout) => Err(RecvTimeoutError::Timeout),
590 }
591 } else {
592 self.receiver.recv_timeout(timeout)
593 }
594 }
595
596 pub fn recv_deadline(&self, deadline: Instant) -> Result<T, RecvTimeoutError> {
637 if let Some(dependency) = &self.depends_on {
638 select_biased! {
639 recv(self.receiver) -> res => {
640 res.map_err(|_| RecvTimeoutError::Disconnected)
641 },
642 recv(dependency.0) -> _ => {
643 Err(RecvTimeoutError::Disconnected)
644 },
645 recv(dependency.1) -> _ => {
646 Err(RecvTimeoutError::Disconnected)
647 },
648 default(deadline.saturating_duration_since(Instant::now())) => Err(RecvTimeoutError::Timeout),
649 }
650 } else {
651 self.receiver.recv_deadline(deadline)
652 }
653 }
654
655 pub fn is_empty(&self) -> bool {
671 self.receiver.is_empty()
672 }
673
674 pub fn is_full(&self) -> bool {
690 self.receiver.is_full()
691 }
692
693 pub fn len(&self) -> usize {
708 self.receiver.len()
709 }
710
711 pub fn capacity(&self) -> Option<usize> {
728 self.receiver.capacity()
729 }
730
731 pub fn iter(&self) -> Iter<'_, T> {
760 self.receiver.iter()
761 }
762
763 pub fn try_iter(&self) -> TryIter<'_, T> {
798 self.receiver.try_iter()
799 }
800
801 pub fn same_channel(&self, other: &Receiver<T>) -> bool {
817 self.receiver.same_channel(&other.receiver)
818 }
819}
820
821impl<T> Clone for Receiver<T> {
822 fn clone(&self) -> Self {
823 Receiver {
824 _liveness_check: self._liveness_check.clone(),
825 receiver: self.receiver.clone(),
826 liveness_check: self.liveness_check.clone(),
827 depends_on: self.depends_on.clone(),
828 }
829 }
830}
831
832impl<T> fmt::Debug for Receiver<T> {
833 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
834 f.pad("Lamba Channel Receiver { .. }")
835 }
836}
837
838pub fn new_channel<T>(capacity: Option<usize>) -> (Sender<T>, Receiver<T>) {
912 let (sender, receiver) = match capacity {
913 None => unbounded(),
914 Some(n) => bounded(n),
915 };
916
917 let (_sender_liveness_check, sender_liveness_check) = bounded(0);
918 let (_receiver_liveness_check, receiver_liveness_check) = bounded(0);
919
920 let sender = Sender {
921 _liveness_check: _sender_liveness_check,
922 sender,
923 liveness_check: receiver_liveness_check,
924 depends_on: None,
925 };
926
927 let receiver = Receiver {
928 _liveness_check: _receiver_liveness_check,
929 receiver,
930 liveness_check: sender_liveness_check,
931 depends_on: None,
932 };
933
934 (sender, receiver)
935}
936
937pub fn new_channel_with_dependency<T, U>(
979 capacity: Option<usize>,
980 dependency_sender: &Sender<U>,
981 dependency_receiver: &Receiver<U>,
982) -> (Sender<T>, Receiver<T>) {
983 let (sender, receiver) = match capacity {
984 None => unbounded(),
985 Some(n) => bounded(n),
986 };
987
988 let (_sender_liveness_check, sender_liveness_check) = bounded(0);
989 let (_receiver_liveness_check, receiver_liveness_check) = bounded(0);
990
991 let sender = Sender {
992 _liveness_check: _sender_liveness_check,
993 sender,
994 liveness_check: receiver_liveness_check,
995 depends_on: Some((
996 dependency_sender.liveness_check.clone(),
997 dependency_receiver.liveness_check.clone(),
998 )),
999 };
1000
1001 let receiver = Receiver {
1002 _liveness_check: _receiver_liveness_check,
1003 receiver,
1004 liveness_check: sender_liveness_check,
1005 depends_on: Some((
1006 dependency_sender.liveness_check.clone(),
1007 dependency_receiver.liveness_check.clone(),
1008 )),
1009 };
1010
1011 (sender, receiver)
1012}
1013
1014#[cfg(test)]
1015mod tests {
1016 use super::*;
1017 use std::sync::atomic::{AtomicBool, Ordering};
1018 use std::sync::Arc;
1019 use std::thread;
1020
1021 use quanta::Clock;
1022
1023 fn send_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1024 assert_eq!(tx.send(1), Ok(()));
1025 assert_eq!(rx.recv(), Ok(1));
1026
1027 drop(rx);
1028 assert_eq!(tx.send(2), Err(SendError(2)));
1029
1030 if let Some(h) = handle {
1031 let _ = h.join();
1032 }
1033 }
1034
1035 #[test]
1036 fn test_send() {
1037 let (tx, rx) = new_channel(None);
1038 send_test(tx, rx, None);
1039 }
1040
1041 #[test]
1042 fn test_dependent_send() {
1043 let (out_tx, rx) = new_channel(None);
1044 let (tx, in_rx) = new_channel_with_dependency(None, &out_tx, &rx);
1045
1046 let handle = thread::spawn(move || loop {
1047 let _ = out_tx.send(in_rx.recv().unwrap());
1048 });
1049
1050 send_test(tx, rx, Some(handle));
1051 }
1052
1053 fn try_send_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1054 assert_eq!(tx.try_send(1), Ok(()));
1055 assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
1056 assert_eq!(rx.recv(), Ok(1));
1057
1058 drop(rx);
1059 assert_eq!(tx.try_send(3), Err(TrySendError::Disconnected(3)));
1060
1061 if let Some(h) = handle {
1062 let _ = h.join();
1063 }
1064 }
1065
1066 #[test]
1067 fn test_try_send() {
1068 let (tx, rx) = new_channel(Some(1));
1069 try_send_test(tx, rx, None);
1070 }
1071
1072 #[test]
1073 fn test_dependent_try_send() {
1074 let (out_tx, rx) = new_channel(Some(0));
1075 let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1076
1077 let handle = thread::spawn(move || loop {
1078 let _ = out_tx.send(in_rx.recv().unwrap());
1079 });
1080
1081 assert_eq!(tx.send(0), Ok(()));
1083 assert_eq!(rx.recv(), Ok(0));
1084
1085 try_send_test(tx, rx, Some(handle));
1086 }
1087
1088 fn send_timeout_test(
1089 tx: Sender<u16>,
1090 rx: Receiver<u16>,
1091 handle: Option<thread::JoinHandle<()>>,
1092 ) {
1093 let timeout = Duration::from_millis(10);
1094 let clock = Clock::new();
1095
1096 let mut s = clock.now();
1097 assert_eq!(tx.send_timeout(1, timeout), Ok(()));
1098 assert!(s.elapsed() < timeout / 4);
1099
1100 s = clock.now();
1101 assert_eq!(
1102 tx.send_timeout(2, timeout),
1103 Err(SendTimeoutError::Timeout(2))
1104 );
1105 assert!(s.elapsed() >= timeout);
1106 assert_eq!(rx.recv(), Ok(1));
1107
1108 drop(rx);
1109 s = clock.now();
1110 assert_eq!(
1111 tx.send_timeout(3, timeout),
1112 Err(SendTimeoutError::Disconnected(3))
1113 );
1114 assert!(s.elapsed() < timeout / 4);
1115
1116 if let Some(h) = handle {
1117 let _ = h.join();
1118 }
1119 }
1120
1121 #[test]
1122 fn test_send_timeout() {
1123 let (tx, rx) = new_channel(Some(1));
1124 send_timeout_test(tx, rx, None);
1125 }
1126
1127 #[test]
1128 fn test_dependent_send_timeout() {
1129 let (out_tx, rx) = new_channel(Some(0));
1130 let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1131
1132 let handle = thread::spawn(move || loop {
1133 let _ = out_tx.send(in_rx.recv().unwrap());
1134 });
1135
1136 send_timeout_test(tx, rx, Some(handle));
1137 }
1138
1139 fn send_deadline_test(
1140 tx: Sender<u16>,
1141 rx: Receiver<u16>,
1142 handle: Option<thread::JoinHandle<()>>,
1143 ) {
1144 let timeout = Duration::from_millis(10);
1145 let clock = Clock::new();
1146
1147 let mut s = clock.now();
1148 let mut deadline = Instant::now() + timeout;
1149 assert_eq!(tx.send_deadline(1, deadline), Ok(()));
1150 assert!(s.elapsed() < timeout / 4);
1151
1152 s = clock.now();
1153 deadline = Instant::now() + timeout;
1154 assert_eq!(
1155 tx.send_deadline(2, deadline),
1156 Err(SendTimeoutError::Timeout(2))
1157 );
1158 assert!(s.elapsed() >= timeout);
1159 assert_eq!(rx.recv(), Ok(1));
1160
1161 drop(rx);
1162 s = clock.now();
1163 deadline = Instant::now() + timeout;
1164 assert_eq!(
1165 tx.send_deadline(3, deadline),
1166 Err(SendTimeoutError::Disconnected(3))
1167 );
1168 assert!(s.elapsed() < timeout / 4);
1169
1170 if let Some(h) = handle {
1171 let _ = h.join();
1172 }
1173 }
1174
1175 #[test]
1176 fn test_send_deadline() {
1177 let (tx, rx) = new_channel(Some(1));
1178 send_deadline_test(tx, rx, None);
1179 }
1180
1181 #[test]
1182 fn test_dependent_send_deadline() {
1183 let (out_tx, rx) = new_channel(Some(0));
1184 let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1185
1186 let handle = thread::spawn(move || loop {
1187 let _ = out_tx.send(in_rx.recv().unwrap());
1188 });
1189
1190 send_deadline_test(tx, rx, Some(handle));
1191 }
1192
1193 fn recv_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1194 assert_eq!(tx.send(1), Ok(()));
1195 assert_eq!(rx.recv(), Ok(1));
1196
1197 assert_eq!(tx.send(2), Ok(()));
1198 drop(tx);
1199
1200 assert_eq!(rx.recv(), Ok(2));
1201 assert_eq!(rx.recv(), Err(RecvError));
1202
1203 if let Some(h) = handle {
1204 let _ = h.join();
1205 }
1206 }
1207
1208 #[test]
1209 fn test_recv() {
1210 let (tx, rx) = new_channel(None);
1211 recv_test(tx, rx, None);
1212 }
1213
1214 #[test]
1215 fn test_dependent_recv() {
1216 let (out_tx, rx) = new_channel(None);
1217 let (tx, in_rx) = new_channel_with_dependency(None, &out_tx, &rx);
1218
1219 let handle = thread::spawn(move || loop {
1220 let _ = out_tx.send(in_rx.recv().unwrap());
1221 });
1222
1223 recv_test(tx, rx, Some(handle));
1224 }
1225
1226 fn try_recv_test(tx: Sender<u16>, rx: Receiver<u16>, handle: Option<thread::JoinHandle<()>>) {
1227 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1228
1229 assert_eq!(tx.send(1), Ok(()));
1230
1231 loop {
1232 match rx.try_recv() {
1233 Ok(v) => {
1234 assert_eq!(v, 1);
1235 break;
1236 }
1237 Err(e) => {
1238 assert_eq!(e, TryRecvError::Empty);
1239 }
1240 };
1241 }
1242
1243 assert_eq!(tx.send(2), Ok(()));
1244 drop(tx);
1245
1246 loop {
1247 match rx.try_recv() {
1248 Ok(v) => {
1249 assert_eq!(v, 2);
1250 break;
1251 }
1252 Err(e) => {
1253 assert_eq!(e, TryRecvError::Empty);
1254 }
1255 };
1256 }
1257
1258 loop {
1259 match rx.try_recv() {
1260 Ok(_) => {
1261 assert!(false);
1262 }
1263 Err(e) => match e {
1264 TryRecvError::Empty => {}
1265 TryRecvError::Disconnected => break,
1266 },
1267 };
1268 }
1269
1270 if let Some(h) = handle {
1271 let _ = h.join();
1272 }
1273 }
1274
1275 #[test]
1276 fn test_try_recv() {
1277 let (tx, rx) = new_channel(Some(1));
1278 try_recv_test(tx, rx, None);
1279 }
1280
1281 #[test]
1282 fn test_dependent_try_recv() {
1283 let (out_tx, rx) = new_channel(Some(0));
1284 let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1285
1286 let handle = thread::spawn(move || loop {
1287 let _ = out_tx.send(in_rx.recv().unwrap());
1288 });
1289
1290 try_recv_test(tx, rx, Some(handle));
1291 }
1292
1293 fn recv_timeout_test(
1294 tx: Sender<u16>,
1295 rx: Receiver<u16>,
1296 handle: Option<thread::JoinHandle<()>>,
1297 ) {
1298 let timeout: Duration = Duration::from_millis(10);
1299 let clock = Clock::new();
1300
1301 let mut s = clock.now();
1302 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1303 assert!(s.elapsed() >= timeout);
1304
1305 assert_eq!(tx.send(1), Ok(()));
1306 s = clock.now();
1307 assert_eq!(rx.recv_timeout(timeout), Ok(1));
1308 assert!(s.elapsed() < timeout / 4);
1309
1310 assert_eq!(tx.send(2), Ok(()));
1311 drop(tx);
1312
1313 s = clock.now();
1314 assert_eq!(rx.recv_timeout(timeout), Ok(2));
1315 assert!(s.elapsed() < timeout / 4);
1316
1317 s = clock.now();
1318 assert_eq!(
1319 rx.recv_timeout(timeout),
1320 Err(RecvTimeoutError::Disconnected)
1321 );
1322 assert!(s.elapsed() < timeout / 4);
1323
1324 if let Some(h) = handle {
1325 let _ = h.join();
1326 }
1327 }
1328
1329 #[test]
1330 fn test_recv_timeout() {
1331 let (tx, rx) = new_channel(Some(1));
1332 recv_timeout_test(tx, rx, None);
1333 }
1334
1335 #[test]
1336 fn test_dependent_recv_timeout() {
1337 let (out_tx, rx) = new_channel(Some(0));
1338 let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1339
1340 let handle = thread::spawn(move || loop {
1341 let _ = out_tx.send(in_rx.recv().unwrap());
1342 });
1343
1344 recv_timeout_test(tx, rx, Some(handle));
1345 }
1346
1347 fn recv_deadline_test(
1348 tx: Sender<u16>,
1349 rx: Receiver<u16>,
1350 handle: Option<thread::JoinHandle<()>>,
1351 ) {
1352 let timeout: Duration = Duration::from_millis(10);
1353 let clock = Clock::new();
1354
1355 let mut s = clock.now();
1356 let mut deadline = Instant::now() + timeout;
1357 assert_eq!(rx.recv_deadline(deadline), Err(RecvTimeoutError::Timeout));
1358 assert!(s.elapsed() >= timeout);
1359
1360 assert_eq!(tx.send(1), Ok(()));
1361 s = clock.now();
1362 deadline = Instant::now() + timeout;
1363 assert_eq!(rx.recv_deadline(deadline), Ok(1));
1364 assert!(s.elapsed() < timeout / 4);
1365
1366 assert_eq!(tx.send(2), Ok(()));
1367 drop(tx);
1368
1369 s = clock.now();
1370 deadline = Instant::now() + timeout;
1371 assert_eq!(rx.recv_deadline(deadline), Ok(2));
1372 assert!(s.elapsed() < timeout / 4);
1373
1374 s = clock.now();
1375 deadline = Instant::now() + timeout;
1376 assert_eq!(
1377 rx.recv_deadline(deadline),
1378 Err(RecvTimeoutError::Disconnected)
1379 );
1380 assert!(s.elapsed() < timeout / 4);
1381
1382 if let Some(h) = handle {
1383 let _ = h.join();
1384 }
1385 }
1386
1387 #[test]
1388 fn test_recv_deadline() {
1389 let (tx, rx) = new_channel(Some(1));
1390 recv_deadline_test(tx, rx, None);
1391 }
1392
1393 #[test]
1394 fn test_dependent_recv_deadline() {
1395 let (out_tx, rx) = new_channel(Some(0));
1396 let (tx, in_rx) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1397
1398 let handle = thread::spawn(move || loop {
1399 let _ = out_tx.send(in_rx.recv().unwrap());
1400 });
1401
1402 recv_deadline_test(tx, rx, Some(handle));
1403 }
1404
1405 #[test]
1406 fn test_crazy_chain_drop_receiver() {
1407 let (out_tx, rx) = new_channel(Some(0));
1408 let (tx1, in_rx1) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1409 let (tx2, in_rx2) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1410 let (tx11, in_rx11) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1411 let (tx12, in_rx12) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1412 let (tx21, in_rx21) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1413 let (tx22, in_rx22) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1414
1415 let v = out_tx.clone();
1416 let handle1 = thread::spawn(move || loop {
1417 let _ = v.send(in_rx1.recv().unwrap());
1418 });
1419
1420 let handle2 = thread::spawn(move || loop {
1421 let _ = out_tx.send(in_rx2.recv().unwrap());
1422 });
1423
1424 let v = tx1.clone();
1425 let handle11 = thread::spawn(move || loop {
1426 let _ = v.send(in_rx11.recv().unwrap());
1427 });
1428
1429 let handle12 = thread::spawn(move || loop {
1430 let _ = tx1.send(in_rx12.recv().unwrap());
1431 });
1432
1433 let v = tx2.clone();
1434 let handle21 = thread::spawn(move || loop {
1435 let _ = v.send(in_rx21.recv().unwrap());
1436 });
1437
1438 let handle22 = thread::spawn(move || loop {
1439 let _ = tx2.send(in_rx22.recv().unwrap());
1440 });
1441
1442 assert_eq!(tx11.send(1), Ok(()));
1443 assert_eq!(rx.recv(), Ok(1));
1444 assert_eq!(tx12.send(2), Ok(()));
1445 assert_eq!(rx.recv(), Ok(2));
1446 assert_eq!(tx21.send(3), Ok(()));
1447 assert_eq!(rx.recv(), Ok(3));
1448 assert_eq!(tx22.send(4), Ok(()));
1449 assert_eq!(rx.recv(), Ok(4));
1450
1451 drop(rx);
1452
1453 let _ = handle1.join();
1454 let _ = handle2.join();
1455 let _ = handle11.join();
1456 let _ = handle12.join();
1457 let _ = handle21.join();
1458 let _ = handle22.join();
1459
1460 assert_eq!(tx11.send(6), Err(SendError(6)));
1461 assert_eq!(tx12.send(7), Err(SendError(7)));
1462 assert_eq!(tx21.send(8), Err(SendError(8)));
1463 assert_eq!(tx22.send(9), Err(SendError(9)));
1464 }
1465
1466 #[test]
1467 fn test_crazy_chain_drop_senders() {
1468 let (out_tx, rx) = new_channel(Some(0));
1469 let (tx1, in_rx1) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1470 let (tx2, in_rx2) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1471 let (tx11, in_rx11) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1472 let (tx12, in_rx12) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1473 let (tx21, in_rx21) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1474 let (tx22, in_rx22) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1475
1476 let v = out_tx.clone();
1477 let handle1 = thread::spawn(move || loop {
1478 let _ = v.send(in_rx1.recv().unwrap());
1479 });
1480
1481 let handle2 = thread::spawn(move || loop {
1482 let _ = out_tx.send(in_rx2.recv().unwrap());
1483 });
1484
1485 let v = tx1.clone();
1486 let handle11 = thread::spawn(move || loop {
1487 let _ = v.send(in_rx11.recv().unwrap());
1488 });
1489
1490 let handle12 = thread::spawn(move || loop {
1491 let _ = tx1.send(in_rx12.recv().unwrap());
1492 });
1493
1494 let v = tx2.clone();
1495 let handle21 = thread::spawn(move || loop {
1496 let _ = v.send(in_rx21.recv().unwrap());
1497 });
1498
1499 let handle22 = thread::spawn(move || loop {
1500 let _ = tx2.send(in_rx22.recv().unwrap());
1501 });
1502
1503 assert_eq!(tx11.send(1), Ok(()));
1504 assert_eq!(rx.recv(), Ok(1));
1505 assert_eq!(tx12.send(2), Ok(()));
1506 assert_eq!(rx.recv(), Ok(2));
1507 assert_eq!(tx21.send(3), Ok(()));
1508 assert_eq!(rx.recv(), Ok(3));
1509 assert_eq!(tx22.send(4), Ok(()));
1510 assert_eq!(rx.recv(), Ok(4));
1511
1512 assert_eq!(tx11.send(5), Ok(()));
1513 drop(tx11);
1514 drop(tx12);
1515 drop(tx21);
1516 drop(tx22);
1517
1518 assert_eq!(rx.recv(), Ok(5));
1519 assert_eq!(rx.recv(), Err(RecvError));
1520
1521 let _ = handle1.join();
1522 let _ = handle2.join();
1523 let _ = handle11.join();
1524 let _ = handle12.join();
1525 let _ = handle21.join();
1526 let _ = handle22.join();
1527 }
1528
1529 #[test]
1530 fn test_crazy_chain_drop_threads() {
1531 let (out_tx, rx) = new_channel(Some(0));
1532 let (tx1, in_rx1) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1533 let (tx2, in_rx2) = new_channel_with_dependency(Some(0), &out_tx, &rx);
1534 let (tx11, in_rx11) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1535 let (tx12, in_rx12) = new_channel_with_dependency(Some(0), &tx1, &in_rx1);
1536 let (tx21, in_rx21) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1537 let (tx22, in_rx22) = new_channel_with_dependency(Some(0), &tx2, &in_rx2);
1538
1539 let v = out_tx.clone();
1540 let thread1_kill = Arc::new(AtomicBool::new(true));
1541 let tk1 = thread1_kill.clone();
1542 let handle1 = thread::spawn(move || {
1543 while tk1.load(Ordering::Acquire) {
1544 let _ = v.send(in_rx1.recv().unwrap());
1545 }
1546 });
1547
1548 let thread2_kill = Arc::new(AtomicBool::new(true));
1549 let tk2 = thread2_kill.clone();
1550 let handle2 = thread::spawn(move || {
1551 while tk2.load(Ordering::Acquire) {
1552 let _ = out_tx.send(in_rx2.recv().unwrap());
1553 }
1554 });
1555
1556 let v = tx1.clone();
1557 let handle11 = thread::spawn(move || loop {
1558 let _ = v.send(in_rx11.recv().unwrap());
1559 });
1560
1561 let handle12 = thread::spawn(move || loop {
1562 let _ = tx1.send(in_rx12.recv().unwrap());
1563 });
1564
1565 let v = tx2.clone();
1566 let handle21 = thread::spawn(move || loop {
1567 let _ = v.send(in_rx21.recv().unwrap());
1568 });
1569
1570 let handle22 = thread::spawn(move || loop {
1571 let _ = tx2.send(in_rx22.recv().unwrap());
1572 });
1573
1574 assert_eq!(tx11.send(1), Ok(()));
1575 assert_eq!(rx.recv(), Ok(1));
1576 assert_eq!(tx11.send(2), Ok(()));
1577 assert_eq!(rx.recv(), Ok(2));
1578 assert_eq!(tx11.send(3), Ok(()));
1579 assert_eq!(rx.recv(), Ok(3));
1580 assert_eq!(tx11.send(4), Ok(()));
1581 assert_eq!(rx.recv(), Ok(4));
1582
1583 thread1_kill.store(false, Ordering::Release);
1584 assert_eq!(tx11.send(5), Ok(()));
1585 assert_eq!(rx.recv(), Ok(5));
1586 let _ = handle1.join();
1587
1588 assert_eq!(tx11.send(6), Err(SendError(6)));
1589 assert_eq!(tx12.send(7), Err(SendError(7)));
1590 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1591
1592 thread2_kill.store(false, Ordering::Release);
1593 assert_eq!(tx21.send(8), Ok(()));
1594 assert_eq!(rx.recv(), Ok(8));
1595 let _ = handle2.join();
1596
1597 assert_eq!(tx21.send(9), Err(SendError(9)));
1598 assert_eq!(tx22.send(10), Err(SendError(10)));
1599 assert_eq!(rx.recv(), Err(RecvError));
1600
1601 let _ = handle11.join();
1602 let _ = handle12.join();
1603 let _ = handle21.join();
1604 let _ = handle22.join();
1605 }
1606
1607 #[test]
1608 fn test_dependency_sender_loss() {
1609 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1610 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1611
1612 assert_eq!(tx.send(1), Ok(()));
1613
1614 drop(dep_tx);
1615
1616 assert_eq!(tx.send(2), Err(SendError(2)));
1617 assert_eq!(rx.recv(), Ok(1));
1618 assert_eq!(rx.recv(), Err(RecvError));
1619
1620 drop(dep_rx);
1621
1622 assert_eq!(tx.send(3), Err(SendError(3)));
1623 assert_eq!(rx.recv(), Err(RecvError));
1624 }
1625
1626 #[test]
1627 fn test_dependency_receiver_loss() {
1628 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1629 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1630
1631 assert_eq!(tx.send(1), Ok(()));
1632
1633 drop(dep_rx);
1634
1635 assert_eq!(tx.send(2), Err(SendError(2)));
1636 assert_eq!(rx.recv(), Ok(1));
1637 assert_eq!(rx.recv(), Err(RecvError));
1638
1639 drop(dep_tx);
1640
1641 assert_eq!(tx.send(3), Err(SendError(3)));
1642 assert_eq!(rx.recv(), Err(RecvError));
1643 }
1644
1645 #[test]
1646 fn test_dependency_sender_loss_try() {
1647 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1648 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1649
1650 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1651 assert_eq!(tx.try_send(1), Ok(()));
1652 assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
1653
1654 drop(dep_tx);
1655
1656 assert_eq!(tx.try_send(3), Err(TrySendError::Disconnected(3)));
1657 assert_eq!(rx.try_recv(), Ok(1));
1658 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1659
1660 drop(dep_rx);
1661
1662 assert_eq!(tx.try_send(4), Err(TrySendError::Disconnected(4)));
1663 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1664 }
1665
1666 #[test]
1667 fn test_dependency_receiver_loss_try() {
1668 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1669 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1670
1671 assert_eq!(rx.try_recv(), Err(TryRecvError::Empty));
1672 assert_eq!(tx.try_send(1), Ok(()));
1673 assert_eq!(tx.try_send(2), Err(TrySendError::Full(2)));
1674
1675 drop(dep_rx);
1676
1677 assert_eq!(tx.try_send(3), Err(TrySendError::Disconnected(3)));
1678 assert_eq!(rx.try_recv(), Ok(1));
1679 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1680
1681 drop(dep_tx);
1682
1683 assert_eq!(tx.try_send(4), Err(TrySendError::Disconnected(4)));
1684 assert_eq!(rx.try_recv(), Err(TryRecvError::Disconnected));
1685 }
1686
1687 #[test]
1688 fn test_dependency_sender_loss_timeout() {
1689 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1690 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1691
1692 let timeout = Duration::from_millis(10);
1693 let clock = Clock::new();
1694
1695 let mut s = clock.now();
1696 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1697 assert!(s.elapsed() >= timeout);
1698
1699 s = clock.now();
1700 assert_eq!(tx.send_timeout(1, timeout), Ok(()));
1701 assert!(s.elapsed() < timeout / 4);
1702
1703 s = clock.now();
1704 assert_eq!(
1705 tx.send_timeout(2, timeout),
1706 Err(SendTimeoutError::Timeout(2))
1707 );
1708 assert!(s.elapsed() >= timeout);
1709
1710 drop(dep_tx);
1711
1712 s = clock.now();
1713 assert_eq!(
1714 tx.send_timeout(3, timeout),
1715 Err(SendTimeoutError::Disconnected(3))
1716 );
1717 assert!(s.elapsed() < timeout / 4);
1718
1719 s = clock.now();
1720 assert_eq!(rx.recv_timeout(timeout), Ok(1));
1721 assert!(s.elapsed() < timeout / 4);
1722
1723 s = clock.now();
1724 assert_eq!(
1725 rx.recv_timeout(timeout),
1726 Err(RecvTimeoutError::Disconnected)
1727 );
1728 assert!(s.elapsed() < timeout / 4);
1729
1730 drop(dep_rx);
1731
1732 s = clock.now();
1733 assert_eq!(
1734 tx.send_timeout(4, timeout),
1735 Err(SendTimeoutError::Disconnected(4))
1736 );
1737 assert!(s.elapsed() < timeout / 4);
1738
1739 s = clock.now();
1740 assert_eq!(
1741 rx.recv_timeout(timeout),
1742 Err(RecvTimeoutError::Disconnected)
1743 );
1744 assert!(s.elapsed() < timeout / 4);
1745 }
1746
1747 #[test]
1748 fn test_dependency_receiver_loss_timeout() {
1749 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1750 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1751
1752 let timeout = Duration::from_millis(10);
1753 let clock = Clock::new();
1754
1755 let mut s = clock.now();
1756 assert_eq!(rx.recv_timeout(timeout), Err(RecvTimeoutError::Timeout));
1757 assert!(s.elapsed() >= timeout);
1758
1759 s = clock.now();
1760 assert_eq!(tx.send_timeout(1, timeout), Ok(()));
1761 assert!(s.elapsed() < timeout / 4);
1762
1763 s = clock.now();
1764 assert_eq!(
1765 tx.send_timeout(2, timeout),
1766 Err(SendTimeoutError::Timeout(2))
1767 );
1768 assert!(s.elapsed() >= timeout);
1769
1770 drop(dep_rx);
1771
1772 s = clock.now();
1773 assert_eq!(
1774 tx.send_timeout(3, timeout),
1775 Err(SendTimeoutError::Disconnected(3))
1776 );
1777 assert!(s.elapsed() < timeout / 4);
1778
1779 s = clock.now();
1780 assert_eq!(rx.recv_timeout(timeout), Ok(1));
1781 assert!(s.elapsed() < timeout / 4);
1782
1783 s = clock.now();
1784 assert_eq!(
1785 rx.recv_timeout(timeout),
1786 Err(RecvTimeoutError::Disconnected)
1787 );
1788 assert!(s.elapsed() < timeout / 4);
1789
1790 drop(dep_tx);
1791
1792 s = clock.now();
1793 assert_eq!(
1794 tx.send_timeout(4, timeout),
1795 Err(SendTimeoutError::Disconnected(4))
1796 );
1797 assert!(s.elapsed() < timeout / 4);
1798
1799 s = clock.now();
1800 assert_eq!(
1801 rx.recv_timeout(timeout),
1802 Err(RecvTimeoutError::Disconnected)
1803 );
1804 assert!(s.elapsed() < timeout / 4);
1805 }
1806
1807 #[test]
1808 fn test_dependency_sender_loss_deadline() {
1809 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1810 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1811
1812 let timeout = Duration::from_millis(10);
1813 let clock = Clock::new();
1814
1815 let mut s = clock.now();
1816 let mut deadline = Instant::now() + timeout;
1817 assert_eq!(rx.recv_deadline(deadline), Err(RecvTimeoutError::Timeout));
1818 assert!(s.elapsed() >= timeout);
1819
1820 s = clock.now();
1821 deadline = Instant::now() + timeout;
1822 assert_eq!(tx.send_deadline(1, deadline), Ok(()));
1823 assert!(s.elapsed() < timeout / 4);
1824
1825 s = clock.now();
1826 deadline = Instant::now() + timeout;
1827 assert_eq!(
1828 tx.send_deadline(2, deadline),
1829 Err(SendTimeoutError::Timeout(2))
1830 );
1831 assert!(s.elapsed() >= timeout);
1832
1833 drop(dep_tx);
1834
1835 s = clock.now();
1836 deadline = Instant::now() + timeout;
1837 assert_eq!(
1838 tx.send_deadline(3, deadline),
1839 Err(SendTimeoutError::Disconnected(3))
1840 );
1841 assert!(s.elapsed() < timeout / 4);
1842
1843 s = clock.now();
1844 deadline = Instant::now() + timeout;
1845 assert_eq!(rx.recv_deadline(deadline), Ok(1));
1846 assert!(s.elapsed() < timeout / 4);
1847
1848 s = clock.now();
1849 deadline = Instant::now() + timeout;
1850 assert_eq!(
1851 rx.recv_deadline(deadline),
1852 Err(RecvTimeoutError::Disconnected)
1853 );
1854 assert!(s.elapsed() < timeout / 4);
1855
1856 drop(dep_rx);
1857
1858 s = clock.now();
1859 deadline = Instant::now() + timeout;
1860 assert_eq!(
1861 tx.send_deadline(4, deadline),
1862 Err(SendTimeoutError::Disconnected(4))
1863 );
1864 assert!(s.elapsed() < timeout / 4);
1865
1866 s = clock.now();
1867 deadline = Instant::now() + timeout;
1868 assert_eq!(
1869 rx.recv_deadline(deadline),
1870 Err(RecvTimeoutError::Disconnected)
1871 );
1872 assert!(s.elapsed() < timeout / 4);
1873 }
1874
1875 #[test]
1876 fn test_dependency_receiver_loss_deadline() {
1877 let (dep_tx, dep_rx) = new_channel::<()>(Some(1));
1878 let (tx, rx) = new_channel_with_dependency(Some(1), &dep_tx, &dep_rx);
1879
1880 let timeout = Duration::from_millis(10);
1881 let clock = Clock::new();
1882
1883 let mut s = clock.now();
1884 let mut deadline = Instant::now() + timeout;
1885 assert_eq!(rx.recv_deadline(deadline), Err(RecvTimeoutError::Timeout));
1886 assert!(s.elapsed() >= timeout);
1887
1888 s = clock.now();
1889 deadline = Instant::now() + timeout;
1890 assert_eq!(tx.send_deadline(1, deadline), Ok(()));
1891 assert!(s.elapsed() < timeout / 4);
1892
1893 s = clock.now();
1894 deadline = Instant::now() + timeout;
1895 assert_eq!(
1896 tx.send_deadline(2, deadline),
1897 Err(SendTimeoutError::Timeout(2))
1898 );
1899 assert!(s.elapsed() >= timeout);
1900
1901 drop(dep_rx);
1902
1903 s = clock.now();
1904 deadline = Instant::now() + timeout;
1905 assert_eq!(
1906 tx.send_deadline(3, deadline),
1907 Err(SendTimeoutError::Disconnected(3))
1908 );
1909 assert!(s.elapsed() < timeout / 4);
1910
1911 s = clock.now();
1912 deadline = Instant::now() + timeout;
1913 assert_eq!(rx.recv_deadline(deadline), Ok(1));
1914 assert!(s.elapsed() < timeout / 4);
1915
1916 s = clock.now();
1917 deadline = Instant::now() + timeout;
1918 assert_eq!(
1919 rx.recv_deadline(deadline),
1920 Err(RecvTimeoutError::Disconnected)
1921 );
1922 assert!(s.elapsed() < timeout / 4);
1923
1924 drop(dep_tx);
1925
1926 s = clock.now();
1927 deadline = Instant::now() + timeout;
1928 assert_eq!(
1929 tx.send_deadline(4, deadline),
1930 Err(SendTimeoutError::Disconnected(4))
1931 );
1932 assert!(s.elapsed() < timeout / 4);
1933
1934 s = clock.now();
1935 deadline = Instant::now() + timeout;
1936 assert_eq!(
1937 rx.recv_deadline(deadline),
1938 Err(RecvTimeoutError::Disconnected)
1939 );
1940 assert!(s.elapsed() < timeout / 4);
1941 }
1942}