use std::os::raw::{c_char, c_int, c_ulong};
use crate::queue::{Unmanaged as UnmanagedQueue};
use std::os::unix::io::IntoRawFd;
use std::ffi::{c_void, CStr};
use std::ops::Deref;
use std::ptr::NonNull;
use libc::{mode_t, off_t, size_t};
use crate::data::{Unmanaged, DispatchData, dispatch_release, dispatch_retain};
use crate::block_impl::{WriteEscapingBlock};
#[repr(transparent)]
#[allow(non_camel_case_types)]
#[derive(Clone,Copy)]
pub struct dispatch_fd_t(c_int);
impl dispatch_fd_t {
pub fn new<F: IntoRawFd>(f: F) -> dispatch_fd_t {
dispatch_fd_t(f.into_raw_fd())
}
}
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct dispatch_io_type_t(pub c_ulong);
impl dispatch_io_type_t {
pub const STREAM: dispatch_io_type_t = dispatch_io_type_t(0);
pub const RANDOM: dispatch_io_type_t = dispatch_io_type_t(1);
}
#[repr(C)]
pub struct UnmanagedIO(c_void);
unsafe impl Send for UnmanagedIO {}
unsafe impl Sync for UnmanagedIO {}
#[repr(C)]
#[allow(non_camel_case_types)]
pub struct dispatch_io_close_flags_t(pub c_ulong);
impl Default for dispatch_io_close_flags_t {
fn default() -> Self {
Self(0)
}
}
impl dispatch_io_close_flags_t {
pub const STOP: dispatch_io_close_flags_t = dispatch_io_close_flags_t(0x1);
}
extern "C" {
fn dispatch_read(fd: dispatch_fd_t, length: usize, queue: *const UnmanagedQueue,
handler: *mut c_void);
fn dispatch_write(fd: dispatch_fd_t, data: *const Unmanaged, queue: *const UnmanagedQueue, handler: *mut c_void);
fn dispatch_io_create(tipe: dispatch_io_type_t, fd: dispatch_fd_t, queue: *const UnmanagedQueue, cleanup_handler: *mut c_void) -> *mut UnmanagedIO;
fn dispatch_io_create_with_path(tipe: dispatch_io_type_t, path: *const c_char, oflag: c_int, mode_t: mode_t, queue: *const UnmanagedQueue,cleanup_handler: *mut c_void) -> *mut UnmanagedIO;
fn dispatch_io_read(channel: *const UnmanagedIO, offset: off_t, length: size_t, queue: *const UnmanagedQueue, handler: *const c_void);
fn dispatch_io_close(channel: *const UnmanagedIO, flags: dispatch_io_close_flags_t);
}
pub fn read_completion<F>(fd: dispatch_fd_t, length: usize, queue: &UnmanagedQueue, handler: F) where F: FnOnce(*const Unmanaged, c_int) + Send + 'static {
unsafe{
use crate::block_impl::ReadEscapingBlock;
let mut block = ReadEscapingBlock::new(handler);
dispatch_read(fd, length, queue, &mut block as *mut _ as *mut c_void)
}
}
pub fn write_completion<F,D: DispatchData>(fd: dispatch_fd_t, data: &D, queue: &UnmanagedQueue, handler: F) where F: FnOnce(Option<&Unmanaged>, c_int) + Send + 'static {
unsafe {
let mut block = WriteEscapingBlock::new(handler);
let actual_data = data.as_unmanaged();
dispatch_write(fd, actual_data, queue, &mut block as *mut _ as *mut c_void)
}
}
impl UnmanagedIO {
pub fn new(tipe: dispatch_io_type_t, fd: dispatch_fd_t, queue: &UnmanagedQueue) -> *mut Self {
unsafe {
dispatch_io_create(tipe, fd, queue, std::ptr::null_mut())
}
}
pub fn new_with_path(tipe: dispatch_io_type_t,path: &CStr, oflag: c_int, mode_t: mode_t, queue: &UnmanagedQueue) -> *mut Self {
unsafe {
dispatch_io_create_with_path(tipe, path.as_ptr(), oflag, mode_t, queue, std::ptr::null_mut() )
}
}
pub fn read<H: FnMut(&mut E, bool, *const Unmanaged, c_int) + Send + 'static,E>(&self, offset: off_t, length: size_t, queue: *const UnmanagedQueue, handler: H,initial_environment: E) {
unsafe {
blocksr::many_escaping_nonreentrant!(DataHandler (environment: &mut E, done: bool, data: *const Unmanaged, error: c_int) -> ());
let mut block = DataHandler::new(initial_environment, handler);
dispatch_io_read(self, offset, length, queue, &mut block as *mut _ as *mut c_void);
}
}
pub fn close(&self, flags: dispatch_io_close_flags_t) {
unsafe{dispatch_io_close(self, flags)}
}
}
#[derive(Debug)]
pub struct IO(NonNull<UnmanagedIO>);
impl Clone for IO {
fn clone(&self) -> Self {
unsafe{dispatch_retain(self.0.as_ptr() as *const c_void)};
Self(self.0)
}
}
unsafe impl Send for IO {}
unsafe impl Sync for IO {}
impl IO {
pub fn new_with_path(tipe: dispatch_io_type_t,path: &CStr, oflag: c_int, mode_t: mode_t, queue: &UnmanagedQueue) -> Option<Self> {
let ptr = UnmanagedIO::new_with_path(tipe, path,oflag,mode_t,queue);
unsafe {
if ptr.is_null() {
None
}
else {
Some(Self(NonNull::new_unchecked(ptr)))
}
}
}
pub fn new(tipe: dispatch_io_type_t, fd: dispatch_fd_t, queue: &UnmanagedQueue) -> Option<Self> {
let ptr = UnmanagedIO::new(tipe, fd, queue);
unsafe {
if ptr.is_null() {
None
}
else {
Some(Self(NonNull::new_unchecked(ptr)))
}
}
}
}
impl Deref for IO {
type Target = UnmanagedIO;
fn deref(&self) -> &Self::Target {
unsafe{self.0.as_ref()}
}
}
impl Drop for IO {
fn drop(&mut self) {
self.close(dispatch_io_close_flags_t::STOP);
unsafe{
dispatch_release(self.0.as_ptr() as *const c_void)
}
}
}
#[test] fn read_t() {
use std::os::unix::io::IntoRawFd;
use crate::qos::QoS;
let path = std::path::Path::new("src/io.rs");
let file = std::fs::File::open(path).unwrap();
use crate::data::{Contiguous};
let fd = dispatch_fd_t(file.into_raw_fd());
let (sender,receiver) = std::sync::mpsc::channel::<Result<Contiguous,()>>();
read_completion(fd,20,crate::queue::global(QoS::UserInitiated).unwrap(), move |data,err| {
if err != 0 {
sender.send(Err(())).unwrap();
}
else {
let as_contig = Contiguous::new(unsafe{&*data});
sender.send(Ok(as_contig)).unwrap();
}
});
let item = receiver.recv().unwrap().expect("Not error");
assert_eq!(item.as_slice(),[47, 47, 32, 83, 80, 68, 88, 45, 76, 105, 99, 101, 110, 115, 101, 45, 73, 100, 101, 110]);
}
#[test] fn write_t() {
use std::os::unix::io::IntoRawFd;
use crate::qos::QoS;
use crate::queue::global;
use crate::external_data::{ExternalMemory, HasMemory};
let path = std::path::Path::new("/tmp/dispatchr_write_t.txt");
let file = std::fs::File::create(path).unwrap();
let fd = dispatch_fd_t(file.into_raw_fd());
let (sender,receiver) = std::sync::mpsc::channel::<Result<(),()>>();
struct StaticMemory;
impl HasMemory for StaticMemory {
fn as_slice(&self) -> &[u8] {
"hello from the test".as_bytes()
}
}
let queue = global(QoS::UserInitiated).unwrap();
let data = ExternalMemory::new(StaticMemory, Some(queue)
);
write_completion(fd, &data, queue,move |a,b| {
if b == 0 {
sender.send(Ok(())).unwrap()
}
else {
sender.send(Err(())).unwrap()
}
println!("hello from write {:?} {:?}",a,b);
});
receiver.recv().unwrap().expect("Not error");
let result = std::fs::read(path).unwrap();
assert!(result.as_slice() == "hello from the test".as_bytes());
}
#[test] fn create_io() {
let path = std::path::Path::new("src/io.rs").canonicalize().unwrap();
use std::os::unix::ffi::OsStrExt;
use std::ffi::CString;
use crate::qos::QoS;
let c_path = CString::new(path.as_os_str().as_bytes()).unwrap();
let queue = super::queue::global(QoS::Default).unwrap();
let f = UnmanagedIO::new_with_path(dispatch_io_type_t::STREAM, &c_path, 0, 0, queue);
assert!(!f.is_null());
let as_ref = unsafe{&*f};
as_ref.close(dispatch_io_close_flags_t::default());
unsafe{
dispatch_release(as_ref as *const _ as *const c_void);
}
let p = IO::new_with_path(dispatch_io_type_t::STREAM, &c_path, 0, 0, queue);
assert!(p.is_some());
use std::sync::mpsc::sync_channel;
let (sender,receiver) = sync_channel(0);
let channel = p.unwrap();
channel.read(0, 100, queue, |environment, done,data,err| {
println!("hi");
if done {
assert_eq!(err,0);
assert!(unsafe{&*data}.len()>=100);
environment.send(()).unwrap();
}
},sender);
receiver.recv_timeout(std::time::Duration::new(10,0)).unwrap();
}
#[test] fn assert_send() {
fn assert_send<T: Send>() {}
assert_send::<IO>();
}