#![cfg_attr(not(feature = "std"), no_std)]
#![cfg_attr(docsrs, feature(doc_cfg))]
extern crate alloc;
#[allow(unused_imports)]
pub(crate) mod std_prelude {
#[cfg(not(feature = "std"))]
pub use ::alloc::{boxed::Box, vec, vec::Vec};
#[cfg(feature = "std")]
pub use std::prelude::v1::*;
#[cfg(not(feature = "std"))]
pub mod std {
pub use ::alloc::*;
}
pub use crate::derive::*;
#[cfg(feature = "std")]
pub use ::std;
}
pub use mfio_derive as derive;
pub mod backend;
pub mod error;
#[cfg(feature = "std")]
#[cfg_attr(docsrs, doc(cfg(feature = "std")))]
pub mod futures_compat;
pub mod io;
pub mod stdeq;
pub mod traits;
pub mod prelude {
pub mod v1 {
#[cfg(all(unix, feature = "std", feature = "async-io"))]
pub use crate::backend::integrations::async_io::AsyncIo;
#[cfg(all(unix, not(miri), feature = "std", feature = "tokio"))]
pub use crate::backend::integrations::tokio::Tokio;
pub use crate::backend::{Integration, IoBackend, IoBackendExt, Null};
pub use crate::error::*;
#[cfg(feature = "std")]
pub use crate::futures_compat::FuturesCompat;
pub use crate::io::{
FullPacket, IntoPacket, OwnedPacket, Packet, PacketIo, PacketIoExt, PacketView, Read,
RefPacket, VecPacket, Write,
};
pub use crate::stdeq::{Seekable, SeekableRef};
pub use crate::traits::{IoRead, IoWrite};
}
}
mod poller;
mod util;
#[cfg(not(mfio_assume_linear_types))]
#[macro_export]
macro_rules! linear_types_switch {
(Standard => { $($matched:tt)* } $($end:ident => $block2:block)*) => {
$($matched)*
};
($start:ident => $block2:block $($end:tt)*) => {
$crate::linear_types_switch!{
$($end)*
}
}
}
#[cfg(mfio_assume_linear_types)]
#[macro_export]
macro_rules! linear_types_switch {
(Linear => { $($matched:tt)* } $($end:ident => $block2:block)*) => {
$($matched)*
};
($start:ident => $block2:block $($end:tt)*) => {
$crate::linear_types_switch!{
$($end)*
}
}
}
#[cfg(feature = "std")]
pub use parking_lot as locks;
#[cfg(not(feature = "std"))]
pub use spin as locks;
pub use tarc;
#[cfg(test)]
mod sample {
use crate as mfio;
use crate::std_prelude::*;
include!("sample.rs");
}
#[cfg(test)]
mod tests {
use crate::prelude::v1::*;
use crate::std_prelude::*;
use super::*;
use crate::sample::SampleIo;
use bytemuck::{Pod, Zeroable};
use core::pin::pin;
use futures::StreamExt;
#[test]
fn oobe() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.with_backend(async {
let pkt = handle.io(200, Packet::<Write>::new_buf(200)).await;
assert_eq!(pkt.as_ref().error_clamp(), 0);
});
core::mem::drop(handle);
}
#[test]
fn split_oobe() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.with_backend(async {
let pkt = handle.io(199, Packet::<Write>::new_buf(200)).await;
assert_eq!(pkt.as_ref().error_clamp(), 1);
assert_eq!(pkt.as_ref().min_error().unwrap().state, State::Outside);
});
core::mem::drop(handle);
}
#[test]
fn split_oobe_stream() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.with_backend(async {
let fut = handle.io_to_stream(199, Packet::<Write>::new_buf(200), vec![]);
let mut fut = pin!(fut);
let stream = fut.as_mut().submit();
let pkts = stream.collect::<Vec<_>>().await;
fut.await;
assert_eq!(pkts.len(), 2);
});
core::mem::drop(handle);
}
#[test]
fn split_oobe_func() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.with_backend(async {
let out = tarc::BaseArc::new(crate::locks::Mutex::new(vec![]));
handle.io_to_fn(199, Packet::<Write>::new_buf(200), {
let out = out.clone();
move |view, err| out.lock().push((view, err))
});
let pkts = out.lock();
assert_eq!(pkts.len(), 2);
});
core::mem::drop(handle);
}
#[test]
fn single_elem_read() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.with_backend(async {
let pkt = handle.io(100, Packet::<Write>::new_buf(1)).await;
assert_eq!(pkt.simple_contiguous_slice().unwrap(), &[100]);
});
core::mem::drop(handle);
}
#[test]
fn two_read_scopes() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.block_on(async {
let pkt = handle.io(100, Packet::<Write>::new_buf(1)).await;
assert_eq!(pkt.simple_contiguous_slice().unwrap(), &[100]);
});
handle.block_on(async {
let pkt = handle.io(100, Packet::<Write>::new_buf(1)).await;
assert_eq!(pkt.simple_contiguous_slice().unwrap(), &[100]);
});
}
#[test]
fn single_elem_write() {
let handle = SampleIo::default();
let value = [42u8];
handle.with_backend(async {
let (pkt, _) = value.into_packet();
let pkt = handle.io(100, pkt).await;
assert_eq!(pkt.min_error(), None);
let pkt = handle.io(100, Packet::<Write>::new_buf(value.len())).await;
assert_eq!(pkt.simple_contiguous_slice().unwrap(), &value);
});
core::mem::drop(handle);
}
#[test]
fn single_elem_write_and_read() {
let handle = SampleIo::default();
let write = [42u8];
handle.with_backend(async {
let (pkt, _) = write.into_packet();
let pkt = handle.io(100, pkt).await;
assert_eq!(pkt.min_error(), None);
let pkt = handle.io(100, Packet::<Write>::new_buf(write.len())).await;
let read = pkt.simple_contiguous_slice().unwrap();
assert_eq!(&write, read);
});
}
#[test]
fn simple_struct_write_and_read() {
let handle = SampleIo::default();
#[repr(C)]
#[derive(Clone, Copy, Eq, PartialEq, Debug, Pod, Zeroable)]
struct TestStruct {
a: u32,
b: u32,
c: u32,
}
let write = TestStruct {
a: 57,
b: 109,
c: 8,
};
handle.with_backend(async {
handle.write(100, &write).await.unwrap();
let read = handle.read::<TestStruct>(100).await.unwrap();
assert_eq!(write, read);
});
}
#[test]
fn two_elems() {
let handle = SampleIo::new((0..200).collect::<Vec<_>>());
handle.with_backend(async {
for _ in 0..2 {
let pkt = handle.io(100, Packet::<Write>::new_buf(1)).await;
assert_eq!(pkt.simple_contiguous_slice().unwrap(), &[100]);
}
});
core::mem::drop(handle);
}
#[test]
fn drop_bare_stream() {
let handle = SampleIo::default();
let fut = handle.io(100, Packet::<Write>::new_buf(0));
core::mem::drop(fut);
}
#[test]
fn drop_bound_stream() {
let handle = SampleIo::default();
let pkt = Packet::<Write>::new_buf(1);
let pv = PacketView::from_arc_ref(&pkt, 0);
let bpv = unsafe { pv.bind(None) };
handle.send_io(0, bpv);
core::mem::drop(pkt);
}
#[test]
#[should_panic]
fn fully_drop_bound_stream() {
let handle = SampleIo::default();
let pkt = Packet::<Write>::new_buf(1);
let pv = PacketView::from_arc_ref(&pkt, 0);
let bpv = unsafe { pv.bind(None) };
handle.send_io(0, bpv);
unsafe { tarc::BaseArc::decrement_strong_count(pkt.as_ptr()) };
core::mem::drop(pkt);
}
#[cfg(feature = "std")]
#[tokio::test(flavor = "multi_thread", worker_threads = 8)]
async fn bench() {
use core::mem::MaybeUninit;
use std::time::{Duration, Instant};
let mut io = SampleIo::default();
const MILLIS: u64 = 10;
Null::run_with_mut(&mut io, |io| async move {
println!("Sequential:");
let start = Instant::now();
let mut cnt = 0;
while start.elapsed() < Duration::from_millis(MILLIS) {
cnt += 1;
io.read_all(100, &mut [MaybeUninit::uninit()][..])
.await
.unwrap();
}
println!("{:.2}", cnt as f64 / start.elapsed().as_secs_f64());
})
.await;
let jobs_in_flight = (1..=2)
.map(|i| {
let mut io = io.clone();
async move {
Null::run_with_mut(&mut io, move |scope| async move {
println!("Multiple reads in-flight MT:");
let start = Instant::now();
let mut cnt = 0;
let mut q = std::collections::VecDeque::new();
while start.elapsed() < Duration::from_millis(MILLIS) {
cnt += 1;
q.push_back(scope.io(100, Packet::<Write>::new_buf(1)));
if q.len() >= 4096 / 16 {
q.pop_front().unwrap().await;
}
}
futures::future::join_all(q).await;
let speed = cnt as f64 / start.elapsed().as_secs_f64();
println!("{i}: {:.2}", speed);
speed
})
.await
}
})
.map(tokio::spawn)
.collect::<Vec<_>>();
println!("AWAIT");
let cnt = futures::future::join_all(jobs_in_flight)
.await
.into_iter()
.filter_map(core::result::Result::ok)
.sum::<f64>();
println!("CNT: {cnt:.2}");
Null::run_with_mut(&mut io, |io| async move {
let io = &io;
println!("Multiple reads in-flight:");
let start = Instant::now();
let mut cnt = 0;
let mut q = std::collections::VecDeque::new();
while start.elapsed() < Duration::from_millis(MILLIS) {
cnt += 1;
q.push_back(io.io(100, Packet::<Write>::new_buf(1)));
if q.len() >= 4096 * 4 {
q.pop_front().unwrap().await;
}
}
let fut = futures::future::join_all(q);
fut.await;
let speed = cnt as f64 / start.elapsed().as_secs_f64();
println!("{:.2}", speed);
})
.await;
println!("DROP");
}
}