#![warn(missing_docs)]
pub mod clear;
#[cfg(feature = "hydroflow_macro")]
pub mod demux_enum;
pub mod monotonic_map;
pub mod multiset;
pub mod sparse_vec;
pub mod unsync;
pub mod simulation;
mod monotonic;
pub use monotonic::*;
mod udp;
#[cfg(not(target_arch = "wasm32"))]
pub use udp::*;
mod tcp;
#[cfg(not(target_arch = "wasm32"))]
pub use tcp::*;
#[cfg(unix)]
mod socket;
#[cfg(unix)]
pub use socket::*;
#[cfg(feature = "deploy_integration")]
pub mod deploy;
use std::io::Read;
use std::net::SocketAddr;
use std::num::NonZeroUsize;
use std::process::{Child, ChildStdin, ChildStdout, Stdio};
use std::task::{Context, Poll};
use futures::Stream;
use serde::de::DeserializeOwned;
use serde::ser::Serialize;
pub enum Persistence<T> {
Persist(T),
Delete(T),
}
pub enum PersistenceKeyed<K, V> {
Persist(K, V),
Delete(K),
}
pub fn unbounded_channel<T>() -> (
tokio::sync::mpsc::UnboundedSender<T>,
tokio_stream::wrappers::UnboundedReceiverStream<T>,
) {
let (send, recv) = tokio::sync::mpsc::unbounded_channel();
let recv = tokio_stream::wrappers::UnboundedReceiverStream::new(recv);
(send, recv)
}
pub fn unsync_channel<T>(
capacity: Option<NonZeroUsize>,
) -> (unsync::mpsc::Sender<T>, unsync::mpsc::Receiver<T>) {
unsync::mpsc::channel(capacity)
}
pub fn ready_iter<S>(stream: S) -> impl Iterator<Item = S::Item>
where
S: Stream,
{
let mut stream = Box::pin(stream);
std::iter::from_fn(move || {
match stream
.as_mut()
.poll_next(&mut Context::from_waker(futures::task::noop_waker_ref()))
{
Poll::Ready(opt) => opt,
Poll::Pending => None,
}
})
}
pub fn collect_ready<C, S>(stream: S) -> C
where
C: FromIterator<S::Item>,
S: Stream,
{
assert!(tokio::runtime::Handle::try_current().is_err(), "Calling `collect_ready` from an async runtime may cause incorrect results, use `collect_ready_async` instead.");
ready_iter(stream).collect()
}
pub async fn collect_ready_async<C, S>(stream: S) -> C
where
C: Default + Extend<S::Item>,
S: Stream,
{
use std::sync::atomic::Ordering;
tokio::task::yield_now().await;
let got_any_items = std::sync::atomic::AtomicBool::new(true);
let mut unfused_iter =
ready_iter(stream).inspect(|_| got_any_items.store(true, Ordering::Relaxed));
let mut out = C::default();
while got_any_items.swap(false, Ordering::Relaxed) {
out.extend(unfused_iter.by_ref());
tokio::task::yield_now().await;
}
out
}
pub fn serialize_to_bytes<T>(msg: T) -> bytes::Bytes
where
T: Serialize,
{
bytes::Bytes::from(bincode::serialize(&msg).unwrap())
}
pub fn deserialize_from_bytes<T>(msg: impl AsRef<[u8]>) -> bincode::Result<T>
where
T: DeserializeOwned,
{
bincode::deserialize(msg.as_ref())
}
pub fn ipv4_resolve(addr: &str) -> Result<SocketAddr, std::io::Error> {
use std::net::ToSocketAddrs;
let mut addrs = addr.to_socket_addrs()?;
let result = addrs.find(|addr| addr.is_ipv4());
match result {
Some(addr) => Ok(addr),
None => Err(std::io::Error::new(
std::io::ErrorKind::Other,
"Unable to resolve IPv4 address",
)),
}
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn bind_udp_bytes(addr: SocketAddr) -> (UdpSink, UdpStream, SocketAddr) {
let socket = tokio::net::UdpSocket::bind(addr).await.unwrap();
udp_bytes(socket)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn bind_udp_lines(addr: SocketAddr) -> (UdpLinesSink, UdpLinesStream, SocketAddr) {
let socket = tokio::net::UdpSocket::bind(addr).await.unwrap();
udp_lines(socket)
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn bind_tcp_bytes(
addr: SocketAddr,
) -> (
unsync::mpsc::Sender<(bytes::Bytes, SocketAddr)>,
unsync::mpsc::Receiver<Result<(bytes::BytesMut, SocketAddr), std::io::Error>>,
SocketAddr,
) {
bind_tcp(addr, tokio_util::codec::LengthDelimitedCodec::new())
.await
.unwrap()
}
#[cfg(not(target_arch = "wasm32"))]
pub async fn bind_tcp_lines(
addr: SocketAddr,
) -> (
unsync::mpsc::Sender<(String, SocketAddr)>,
unsync::mpsc::Receiver<Result<(String, SocketAddr), tokio_util::codec::LinesCodecError>>,
SocketAddr,
) {
bind_tcp(addr, tokio_util::codec::LinesCodec::new())
.await
.unwrap()
}
#[cfg(not(target_arch = "wasm32"))]
pub fn connect_tcp_bytes() -> (
TcpFramedSink<bytes::Bytes>,
TcpFramedStream<tokio_util::codec::LengthDelimitedCodec>,
) {
connect_tcp(tokio_util::codec::LengthDelimitedCodec::new())
}
#[cfg(not(target_arch = "wasm32"))]
pub fn connect_tcp_lines() -> (
TcpFramedSink<String>,
TcpFramedStream<tokio_util::codec::LinesCodec>,
) {
connect_tcp(tokio_util::codec::LinesCodec::new())
}
pub fn sort_unstable_by_key_hrtb<T, F, K>(slice: &mut [T], f: F)
where
F: for<'a> Fn(&'a T) -> &'a K,
K: Ord,
{
slice.sort_unstable_by(|a, b| f(a).cmp(f(b)))
}
pub fn wait_for_process_output(
output_so_far: &mut String,
output: &mut ChildStdout,
wait_for: &str,
) {
let re = regex::Regex::new(wait_for).unwrap();
while !re.is_match(output_so_far) {
println!("waiting: {}", output_so_far);
let mut buffer = [0u8; 1024];
let bytes_read = output.read(&mut buffer).unwrap();
if bytes_read == 0 {
panic!();
}
output_so_far.push_str(&String::from_utf8_lossy(&buffer[0..bytes_read]));
println!("XXX {}", output_so_far);
}
}
pub struct DroppableChild(Child);
impl Drop for DroppableChild {
fn drop(&mut self) {
#[cfg(target_family = "windows")]
let _ = self.0.kill(); #[cfg(not(target_family = "windows"))]
self.0.kill().unwrap();
self.0.wait().unwrap();
}
}
pub fn run_cargo_example(test_name: &str, args: &str) -> (DroppableChild, ChildStdin, ChildStdout) {
let mut server = if args.is_empty() {
std::process::Command::new("cargo")
.args(["run", "-p", "hydroflow", "--example"])
.arg(test_name)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap()
} else {
std::process::Command::new("cargo")
.args(["run", "-p", "hydroflow", "--example"])
.arg(test_name)
.arg("--")
.args(args.split(' '))
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.spawn()
.unwrap()
};
let stdin = server.stdin.take().unwrap();
let stdout = server.stdout.take().unwrap();
(DroppableChild(server), stdin, stdout)
}
pub fn iter_batches_stream<I>(
iter: I,
n: usize,
) -> futures::stream::PollFn<impl FnMut(&mut Context<'_>) -> Poll<Option<I::Item>>>
where
I: IntoIterator + Unpin,
{
let mut count = 0;
let mut iter = iter.into_iter();
futures::stream::poll_fn(move |ctx| {
count += 1;
if n < count {
count = 0;
ctx.waker().wake_by_ref();
Poll::Pending
} else {
Poll::Ready(iter.next())
}
})
}
#[cfg(test)]
mod test {
use super::*;
#[test]
pub fn test_collect_ready() {
let (send, mut recv) = unbounded_channel::<usize>();
for x in 0..1000 {
send.send(x).unwrap();
}
assert_eq!(1000, collect_ready::<Vec<_>, _>(&mut recv).len());
}
#[crate::test]
pub async fn test_collect_ready_async() {
let (send, mut recv) = unbounded_channel::<usize>();
for x in 0..1000 {
send.send(x).unwrap();
}
assert_eq!(
1000,
collect_ready_async::<Vec<_>, _>(&mut recv).await.len()
);
}
}