use std::fmt;
use std::fmt::Display;
use std::marker::PhantomData;
use std::sync::OnceLock;
use std::time::Duration;
use crossbeam_channel::RecvTimeoutError;
use ipc_channel::IpcError;
use ipc_channel::router::ROUTER;
use malloc_size_of::{MallocSizeOf, MallocSizeOfOps};
use malloc_size_of_derive::MallocSizeOf;
use serde::de::VariantAccess;
use serde::{Deserialize, Deserializer, Serialize, Serializer};
use servo_config::opts;
mod callback;
pub use callback::GenericCallback;
mod lazy_callback;
pub use lazy_callback::{CallbackSetter, LazyCallback, lazy_callback};
mod oneshot;
mod shared_memory;
pub use oneshot::{GenericOneshotReceiver, GenericOneshotSender, oneshot};
pub use shared_memory::GenericSharedMemory;
mod generic_channelset;
pub use generic_channelset::{GenericReceiverSet, GenericSelectionResult};
static USE_IPC: OnceLock<bool> = OnceLock::new();
fn use_ipc() -> bool {
*USE_IPC.get_or_init(|| {
servo_config::opts::get().multiprocess || servo_config::opts::get().force_ipc
})
}
pub trait GenericSend<T>
where
T: serde::Serialize + for<'de> serde::Deserialize<'de>,
{
fn send(&self, _: T) -> SendResult;
fn sender(&self) -> GenericSender<T>;
}
pub struct GenericSender<T: Serialize>(GenericSenderVariants<T>);
enum GenericSenderVariants<T: Serialize> {
Ipc(ipc_channel::ipc::IpcSender<T>),
Crossbeam(crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>),
}
fn serialize_generic_sender_variants<T: Serialize, S: Serializer>(
value: &GenericSenderVariants<T>,
s: S,
) -> Result<S::Ok, S::Error> {
match value {
GenericSenderVariants::Ipc(sender) => {
s.serialize_newtype_variant("GenericSender", 0, "Ipc", sender)
},
GenericSenderVariants::Crossbeam(sender) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
} let sender_clone_addr = Box::leak(Box::new(sender.clone())) as *mut _ as usize;
s.serialize_newtype_variant("GenericSender", 1, "Crossbeam", &sender_clone_addr)
},
}
}
impl<T: Serialize> Serialize for GenericSender<T> {
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
serialize_generic_sender_variants(&self.0, s)
}
}
struct GenericSenderVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T: Serialize + Deserialize<'de>> serde::de::Visitor<'de> for GenericSenderVisitor<T> {
type Value = GenericSenderVariants<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericSender variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericSenderVariantNames {
Ipc,
Crossbeam,
}
let (variant_name, variant_data): (GenericSenderVariantNames, _) = data.variant()?;
match variant_name {
GenericSenderVariantNames::Ipc => variant_data
.newtype_variant::<ipc_channel::ipc::IpcSender<T>>()
.map(|sender| GenericSenderVariants::Ipc(sender)),
GenericSenderVariantNames::Crossbeam => {
if opts::get().multiprocess {
return Err(serde::de::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut crossbeam_channel::Sender<Result<T, ipc_channel::IpcError>>;
#[expect(unsafe_code)]
let sender = unsafe { Box::from_raw(ptr) };
Ok(GenericSenderVariants::Crossbeam(*sender))
},
}
}
}
impl<'a, T: Serialize + Deserialize<'a>> Deserialize<'a> for GenericSender<T> {
fn deserialize<D>(d: D) -> Result<GenericSender<T>, D::Error>
where
D: Deserializer<'a>,
{
d.deserialize_enum(
"GenericSender",
&["Ipc", "Crossbeam"],
GenericSenderVisitor {
marker: PhantomData,
},
)
.map(|variant| GenericSender(variant))
}
}
impl<T> Clone for GenericSender<T>
where
T: Serialize,
{
fn clone(&self) -> Self {
match &self.0 {
GenericSenderVariants::Ipc(chan) => {
GenericSender(GenericSenderVariants::Ipc(chan.clone()))
},
GenericSenderVariants::Crossbeam(chan) => {
GenericSender(GenericSenderVariants::Crossbeam(chan.clone()))
},
}
}
}
impl<T: Serialize> fmt::Debug for GenericSender<T> {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "Sender(..)")
}
}
impl<T: Serialize> GenericSender<T> {
#[inline]
pub fn send(&self, msg: T) -> SendResult {
match &self.0 {
GenericSenderVariants::Ipc(sender) => sender
.send(msg)
.map_err(|e| SendError::SerializationError(e.to_string())),
GenericSenderVariants::Crossbeam(sender) => {
sender.send(Ok(msg)).map_err(|_| SendError::Disconnected)
},
}
}
}
impl<T: Serialize> MallocSizeOf for GenericSender<T> {
fn size_of(&self, ops: &mut MallocSizeOfOps) -> usize {
match &self.0 {
GenericSenderVariants::Ipc(ipc_sender) => ipc_sender.size_of(ops),
GenericSenderVariants::Crossbeam(sender) => sender.size_of(ops),
}
}
}
#[derive(Debug)]
pub enum SendError {
Disconnected,
SerializationError(String),
}
impl From<IpcError> for SendError {
fn from(value: IpcError) -> Self {
match value {
IpcError::SerializationError(ser_de_error) => {
SendError::SerializationError(ser_de_error.to_string())
},
IpcError::Io(error) => {
log::error!("IO Error in ipc {:?}", error);
SendError::Disconnected
},
IpcError::Disconnected => SendError::Disconnected,
}
}
}
impl Display for SendError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
write!(f, "{self:?}")
}
}
pub type SendResult = Result<(), SendError>;
#[derive(Debug)]
pub enum ReceiveError {
DeserializationFailed(String),
Io(std::io::Error),
Disconnected,
}
impl From<IpcError> for ReceiveError {
fn from(e: IpcError) -> Self {
match e {
IpcError::Disconnected => ReceiveError::Disconnected,
IpcError::Io(reason) => ReceiveError::Io(reason),
IpcError::SerializationError(ser_de_error) => {
ReceiveError::DeserializationFailed(ser_de_error.to_string())
},
}
}
}
impl From<crossbeam_channel::RecvError> for ReceiveError {
fn from(_: crossbeam_channel::RecvError) -> Self {
ReceiveError::Disconnected
}
}
impl fmt::Display for ReceiveError {
fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result {
match *self {
ReceiveError::DeserializationFailed(ref error) => {
write!(fmt, "deserialization error: {error}")
},
ReceiveError::Io(ref error) => write!(fmt, "io error: {error}"),
ReceiveError::Disconnected => write!(fmt, "disconnected"),
}
}
}
impl From<std::io::Error> for ReceiveError {
fn from(value: std::io::Error) -> Self {
ReceiveError::Io(value)
}
}
pub enum TryReceiveError {
Empty,
ReceiveError(ReceiveError),
}
impl From<crossbeam_channel::RecvTimeoutError> for TryReceiveError {
fn from(value: crossbeam_channel::RecvTimeoutError) -> Self {
match value {
RecvTimeoutError::Timeout => TryReceiveError::Empty,
RecvTimeoutError::Disconnected => {
TryReceiveError::ReceiveError(ReceiveError::Disconnected)
},
}
}
}
impl From<ipc_channel::TryRecvError> for TryReceiveError {
fn from(e: ipc_channel::TryRecvError) -> Self {
match e {
ipc_channel::TryRecvError::Empty => TryReceiveError::Empty,
ipc_channel::TryRecvError::IpcError(inner) => {
TryReceiveError::ReceiveError(inner.into())
},
}
}
}
impl From<crossbeam_channel::TryRecvError> for TryReceiveError {
fn from(e: crossbeam_channel::TryRecvError) -> Self {
match e {
crossbeam_channel::TryRecvError::Empty => TryReceiveError::Empty,
crossbeam_channel::TryRecvError::Disconnected => {
TryReceiveError::ReceiveError(ReceiveError::Disconnected)
},
}
}
}
pub type RoutedReceiver<T> = crossbeam_channel::Receiver<Result<T, ipc_channel::IpcError>>;
pub type ReceiveResult<T> = Result<T, ReceiveError>;
pub type TryReceiveResult<T> = Result<T, TryReceiveError>;
pub type RoutedReceiverReceiveResult<T> =
Result<Result<T, ipc_channel::IpcError>, crossbeam_channel::RecvError>;
pub fn to_receive_result<T>(receive_result: RoutedReceiverReceiveResult<T>) -> ReceiveResult<T> {
match receive_result {
Ok(Ok(msg)) => Ok(msg),
Err(_crossbeam_recv_err) => Err(ReceiveError::Disconnected),
Ok(Err(ipc_err)) => Err(ReceiveError::DeserializationFailed(ipc_err.to_string())),
}
}
#[derive(MallocSizeOf)]
pub struct GenericReceiver<T>(GenericReceiverVariants<T>)
where
T: for<'de> Deserialize<'de> + Serialize;
impl<T> std::fmt::Debug for GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_tuple("GenericReceiver").finish()
}
}
#[derive(MallocSizeOf)]
enum GenericReceiverVariants<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
Ipc(ipc_channel::ipc::IpcReceiver<T>),
Crossbeam(RoutedReceiver<T>),
}
impl<T> GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
#[inline]
pub fn recv(&self) -> ReceiveResult<T> {
match &self.0 {
GenericReceiverVariants::Ipc(receiver) => Ok(receiver.recv()?),
GenericReceiverVariants::Crossbeam(receiver) => {
let msg = receiver.recv()?;
Ok(msg.expect("Infallible"))
},
}
}
#[inline]
pub fn try_recv(&self) -> TryReceiveResult<T> {
match &self.0 {
GenericReceiverVariants::Ipc(receiver) => Ok(receiver.try_recv()?),
GenericReceiverVariants::Crossbeam(receiver) => {
let msg = receiver.try_recv()?;
Ok(msg.expect("Infallible"))
},
}
}
#[inline]
pub fn try_recv_timeout(&self, timeout: Duration) -> Result<T, TryReceiveError> {
match &self.0 {
GenericReceiverVariants::Ipc(ipc_receiver) => {
ipc_receiver.try_recv_timeout(timeout).map_err(|e| e.into())
},
GenericReceiverVariants::Crossbeam(receiver) => match receiver.recv_timeout(timeout) {
Ok(Ok(value)) => Ok(value),
Ok(Err(_)) => unreachable!("Infallable"),
Err(RecvTimeoutError::Disconnected) => {
Err(TryReceiveError::ReceiveError(ReceiveError::Disconnected))
},
Err(RecvTimeoutError::Timeout) => Err(TryReceiveError::Empty),
},
}
}
#[inline]
pub fn route_preserving_errors(self) -> RoutedReceiver<T>
where
T: Send + 'static,
{
match self.0 {
GenericReceiverVariants::Ipc(ipc_receiver) => {
let (crossbeam_sender, crossbeam_receiver) = crossbeam_channel::unbounded();
let crossbeam_sender_clone = crossbeam_sender;
ROUTER.add_typed_route(
ipc_receiver,
Box::new(move |message| {
let _ = crossbeam_sender_clone
.send(message.map_err(IpcError::SerializationError));
}),
);
crossbeam_receiver
},
GenericReceiverVariants::Crossbeam(receiver) => receiver,
}
}
}
impl<T> Serialize for GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
fn serialize<S: Serializer>(&self, s: S) -> Result<S::Ok, S::Error> {
match &self.0 {
GenericReceiverVariants::Ipc(receiver) => {
s.serialize_newtype_variant("GenericReceiver", 0, "Ipc", receiver)
},
GenericReceiverVariants::Crossbeam(receiver) => {
if opts::get().multiprocess {
return Err(serde::ser::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
} let receiver_clone_addr = Box::leak(Box::new(receiver.clone())) as *mut _ as usize;
s.serialize_newtype_variant("GenericReceiver", 1, "Crossbeam", &receiver_clone_addr)
},
}
}
}
struct GenericReceiverVisitor<T> {
marker: PhantomData<T>,
}
impl<'de, T> serde::de::Visitor<'de> for GenericReceiverVisitor<T>
where
T: for<'a> Deserialize<'a> + Serialize,
{
type Value = GenericReceiver<T>;
fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
formatter.write_str("a GenericReceiver variant")
}
fn visit_enum<A>(self, data: A) -> Result<Self::Value, A::Error>
where
A: serde::de::EnumAccess<'de>,
{
#[derive(Deserialize)]
enum GenericReceiverVariantNames {
Ipc,
Crossbeam,
}
let (variant_name, variant_data): (GenericReceiverVariantNames, _) = data.variant()?;
match variant_name {
GenericReceiverVariantNames::Ipc => variant_data
.newtype_variant::<ipc_channel::ipc::IpcReceiver<T>>()
.map(|receiver| GenericReceiver(GenericReceiverVariants::Ipc(receiver))),
GenericReceiverVariantNames::Crossbeam => {
if use_ipc() {
return Err(serde::de::Error::custom(
"Crossbeam channel found in multiprocess mode!",
));
}
let addr = variant_data.newtype_variant::<usize>()?;
let ptr = addr as *mut RoutedReceiver<T>;
#[expect(unsafe_code)]
let receiver = unsafe { Box::from_raw(ptr) };
Ok(GenericReceiver(GenericReceiverVariants::Crossbeam(
*receiver,
)))
},
}
}
}
impl<'a, T> Deserialize<'a> for GenericReceiver<T>
where
T: for<'de> Deserialize<'de> + Serialize,
{
fn deserialize<D>(d: D) -> Result<GenericReceiver<T>, D::Error>
where
D: Deserializer<'a>,
{
d.deserialize_enum(
"GenericReceiver",
&["Ipc", "Crossbeam"],
GenericReceiverVisitor {
marker: PhantomData,
},
)
}
}
fn new_generic_channel_crossbeam<T>() -> (GenericSender<T>, GenericReceiver<T>)
where
T: Serialize + for<'de> serde::Deserialize<'de>,
{
let (tx, rx) = crossbeam_channel::unbounded();
(
GenericSender(GenericSenderVariants::Crossbeam(tx)),
GenericReceiver(GenericReceiverVariants::Crossbeam(rx)),
)
}
fn new_generic_channel_ipc<T>() -> Result<(GenericSender<T>, GenericReceiver<T>), std::io::Error>
where
T: Serialize + for<'de> serde::Deserialize<'de>,
{
ipc_channel::ipc::channel().map(|(tx, rx)| {
(
GenericSender(GenericSenderVariants::Ipc(tx)),
GenericReceiver(GenericReceiverVariants::Ipc(rx)),
)
})
}
pub fn channel<T>() -> Option<(GenericSender<T>, GenericReceiver<T>)>
where
T: for<'de> Deserialize<'de> + Serialize,
{
if use_ipc() {
new_generic_channel_ipc().ok()
} else {
Some(new_generic_channel_crossbeam())
}
}
#[cfg(test)]
mod single_process_channel_tests {
use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
#[test]
fn generic_crossbeam_can_send() {
let (tx, rx) = new_generic_channel_crossbeam();
tx.send(5).expect("Send failed");
let val = rx.recv().expect("Receive failed");
assert_eq!(val, 5);
}
#[test]
fn generic_crossbeam_ping_pong() {
let (tx, rx) = new_generic_channel_crossbeam();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn generic_ipc_ping_pong() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_ipc().unwrap();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_crossbeam_sender_over_ipc_channel() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_generic_ipc_channel_over_crossbeam() {
let (tx, rx) = new_generic_channel_crossbeam();
let (tx2, rx2) = new_generic_channel_ipc().unwrap();
tx.send(tx2).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let reply_sender = rx.recv().expect("Receive failed");
reply_sender.send(42).expect("Sending reply failed");
});
});
let res = rx2.recv().expect("Receive of reply failed");
assert_eq!(res, 42);
}
#[test]
fn send_crossbeam_receiver_over_ipc_channel() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let (tx2, rx2) = new_generic_channel_crossbeam();
tx.send(rx2).expect("Send failed");
tx2.send(42).expect("Send failed");
std::thread::scope(|s| {
s.spawn(move || {
let another_receiver = rx.recv().expect("Receive failed");
let res = another_receiver.recv().expect("Receive failed");
assert_eq!(res, 42);
});
});
}
#[test]
fn test_timeout_ipc() {
let (tx, rx) = new_generic_channel_ipc().unwrap();
let timeout_duration = std::time::Duration::from_secs(3);
std::thread::spawn(move || {
std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
assert!(tx.send(()).is_ok());
});
let received = rx.try_recv_timeout(timeout_duration);
assert!(received.is_ok());
}
#[test]
fn test_timeout_crossbeam() {
let (tx, rx) = new_generic_channel_crossbeam();
let timeout_duration = std::time::Duration::from_secs(3);
std::thread::spawn(move || {
std::thread::sleep(timeout_duration - std::time::Duration::from_secs(1));
assert!(tx.send(()).is_ok());
});
let received = rx.try_recv_timeout(timeout_duration);
assert!(received.is_ok());
}
}
#[cfg(test)]
mod generic_receiversets_tests {
use std::time::Duration;
use crate::generic_channel::generic_channelset::{
GenericSelectionResult, create_crossbeam_receiver_set, create_ipc_receiver_set,
};
use crate::generic_channel::{new_generic_channel_crossbeam, new_generic_channel_ipc};
#[test]
fn test_ipc_side1() {
let (snd1, recv1) = new_generic_channel_ipc().unwrap();
let (snd2, recv2) = new_generic_channel_ipc().unwrap();
let snd1_c = snd1.clone();
let snd2_c = snd2.clone();
let mut set = create_ipc_receiver_set();
let recv1_select_index = set.add(recv1);
let _recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
snd1_c.send(10).unwrap();
});
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
let _ = snd2_c.send(20); });
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::MessageReceived(recv1_select_index, 10)
);
}
#[test]
fn test_ipc_side2() {
let (snd1, recv1) = new_generic_channel_ipc().unwrap();
let (snd2, recv2) = new_generic_channel_ipc().unwrap();
let snd1_c = snd1.clone();
let snd2_c = snd2.clone();
let mut set = create_ipc_receiver_set();
let _recv1_select_index = set.add(recv1);
let recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(1));
let _ = snd1_c.send(10);
});
std::thread::spawn(move || {
snd2_c.send(20).unwrap();
});
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::MessageReceived(recv2_select_index, 20)
);
}
#[test]
fn test_crossbeam_side1() {
let (snd1, recv1) = new_generic_channel_crossbeam();
let (snd2, recv2) = new_generic_channel_crossbeam();
let snd1_c = snd1.clone();
let snd2_c = snd2.clone();
let mut set = create_crossbeam_receiver_set();
let recv1_select_index = set.add(recv1);
let _recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
snd1_c.send(10).unwrap();
});
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
let _ = snd2_c.send(20);
});
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::MessageReceived(recv1_select_index, 10)
);
}
#[test]
fn test_crossbeam_side2() {
let (snd1, recv1) = new_generic_channel_crossbeam();
let (snd2, recv2) = new_generic_channel_crossbeam();
let snd1_c = snd1.clone();
let snd2_c = snd2.clone();
let mut set = create_crossbeam_receiver_set();
let _recv1_select_index = set.add(recv1);
let recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
let _ = snd1_c.send(10);
});
std::thread::spawn(move || {
snd2_c.send(20).unwrap();
});
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::MessageReceived(recv2_select_index, 20)
);
}
#[test]
fn test_ipc_no_crash_on_disconnect() {
let (snd1, recv1) = new_generic_channel_ipc().unwrap();
let (snd2, recv2) = new_generic_channel_ipc().unwrap();
let snd1_c = snd1.clone();
let mut set = create_ipc_receiver_set();
let _recv1_select_index = set.add(recv1);
let recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
let _ = snd1_c.send(10);
});
std::thread::spawn(move || {
snd2.send(20).unwrap();
});
std::thread::sleep(Duration::from_secs(1));
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::MessageReceived(recv2_select_index, 20)
);
}
#[test]
fn test_crossbeam_no_crash_on_disconnect() {
let (snd1, recv1) = new_generic_channel_crossbeam();
let (snd2, recv2) = new_generic_channel_crossbeam();
let snd1_c = snd1.clone();
let mut set = create_crossbeam_receiver_set();
let _recv1_select_index = set.add(recv1);
let recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
let _ = snd1_c.send(10);
});
std::thread::spawn(move || {
snd2.send(20).unwrap();
});
std::thread::sleep(Duration::from_secs(1));
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::MessageReceived(recv2_select_index, 20)
);
}
#[test]
fn test_ipc_disconnect_correct_message() {
let (snd1, recv1) = new_generic_channel_ipc().unwrap();
let (snd2, recv2) = new_generic_channel_ipc().unwrap();
let snd1_c = snd1.clone();
let mut set = create_ipc_receiver_set();
let _recv1_select_index = set.add(recv1);
let recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
let _ = snd1_c.send(10);
});
std::thread::spawn(move || {
drop(snd2);
});
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::ChannelClosed(recv2_select_index)
);
}
#[test]
fn test_crossbeam_disconnect_correct_messaget() {
let (snd1, recv1) = new_generic_channel_crossbeam();
let (snd2, recv2) = new_generic_channel_crossbeam();
let snd1_c = snd1.clone();
let mut set = create_crossbeam_receiver_set();
let _recv1_select_index = set.add(recv1);
let recv2_select_index = set.add(recv2);
std::thread::spawn(move || {
std::thread::sleep(Duration::from_secs(2));
let _ = snd1_c.send(10);
});
std::thread::spawn(move || {
drop(snd2);
});
let select_result = set.select();
let channel_result = select_result.first().unwrap();
assert_eq!(
*channel_result,
GenericSelectionResult::ChannelClosed(recv2_select_index)
);
}
}