use parking_lot::{Condvar, Mutex};
use std::cell::{Cell, UnsafeCell};
use std::future::Future;
use std::pin::Pin;
use std::ptr::NonNull;
use std::sync::Arc;
use std::sync::atomic::AtomicPtr;
use std::task::{Context, Poll};
use widestring::{U16CStr, U16CString};
use windows::Win32::Foundation::{
STATUS_CANCELLED, STATUS_INSUFFICIENT_RESOURCES, STATUS_PENDING, STATUS_SUCCESS,
STATUS_TRANSACTION_NOT_FOUND, STATUS_VOLUME_DISMOUNTED,
};
use winfsp_sys::{
FSP_FILE_SYSTEM, FSP_FSCTL_FILE_INFO, FSP_FSCTL_TRANSACT_REQ, FSP_FSCTL_TRANSACT_RSP,
NTSTATUS as FSP_STATUS, PVOID,
};
use crate::constants::FspTransactKind;
use crate::filesystem::{AsyncFileSystemContext, DirMarker, FileInfo, FileSystemContext};
use crate::host::interface::{FileSystemUserContext, assert_ctx, catch_panic};
use crate::util::VariableSizedBox;
pub(crate) struct InFlightBarrier {
state: Mutex<BarrierState>,
quiesced: Condvar,
}
struct BarrierState {
count: usize,
draining: bool,
}
impl InFlightBarrier {
pub(crate) fn new() -> Arc<Self> {
Arc::new(Self {
state: Mutex::new(BarrierState {
count: 0,
draining: false,
}),
quiesced: Condvar::new(),
})
}
pub(crate) fn enter(self: &Arc<Self>) -> Option<InFlightGuard> {
let mut s = self.state.lock();
if s.draining {
return None;
}
s.count += 1;
Some(InFlightGuard(Arc::clone(self)))
}
pub(crate) fn begin_drain_and_wait(&self) {
let mut s = self.state.lock();
s.draining = true;
while s.count > 0 {
self.quiesced.wait(&mut s);
}
}
}
pub(crate) struct InFlightGuard(Arc<InFlightBarrier>);
impl Drop for InFlightGuard {
fn drop(&mut self) {
let mut s = self.0.state.lock();
s.count -= 1;
if s.count == 0 {
self.0.quiesced.notify_all();
}
}
}
pub(crate) struct AsyncFileOperationContext {
request: VariableSizedBox<FSP_FSCTL_TRANSACT_REQ>,
response: VariableSizedBox<UnsafeCell<FSP_FSCTL_TRANSACT_RSP>>,
}
unsafe impl Send for AsyncFileOperationContext {}
impl AsyncFileOperationContext {
pub(crate) unsafe fn snapshot(kind: FspTransactKind, hint: u64) -> Option<Box<Self>> {
let op = unsafe { winfsp_sys::FspFileSystemGetOperationContext().as_ref()? };
let req_ptr = op.Request;
if req_ptr.is_null() {
return None;
}
let req_size = unsafe { (*req_ptr).Size as usize };
let mut request = VariableSizedBox::<FSP_FSCTL_TRANSACT_REQ>::new(req_size);
unsafe {
std::ptr::copy_nonoverlapping(
req_ptr as *const u8,
request.as_mut_ptr() as *mut u8,
req_size,
);
}
let mut response = VariableSizedBox::<UnsafeCell<FSP_FSCTL_TRANSACT_RSP>>::new(
crate::constants::FSP_FSCTL_TRANSACT_RSP_SIZEMAX,
);
let mut initial_response = FSP_FSCTL_TRANSACT_RSP::default();
initial_response.Size = std::mem::size_of_val(&initial_response) as u16;
initial_response.Kind = kind as u32;
initial_response.Hint = hint;
unsafe {
std::ptr::write(
response.as_mut_ptr() as *mut FSP_FSCTL_TRANSACT_RSP,
initial_response,
);
}
Some(Box::new(AsyncFileOperationContext { request, response }))
}
pub(crate) fn request(&self) -> &FSP_FSCTL_TRANSACT_REQ {
unsafe { self.request.as_ref() }
}
pub(crate) fn response_mut(&self) -> *mut FSP_FSCTL_TRANSACT_RSP {
unsafe { self.response.as_ref() }.get()
}
}
thread_local! {
pub(crate) static ASYNC_OP_CTX: Cell<Option<NonNull<AsyncFileOperationContext>>> = const { Cell::new(None) };
}
pub(crate) struct InjectContextFuture<F> {
inner: F,
op_ptr: NonNull<AsyncFileOperationContext>,
}
unsafe impl<F: Send> Send for InjectContextFuture<F> {}
impl<F> InjectContextFuture<F> {
pub(crate) fn new(inner: F, op_ctx: &AsyncFileOperationContext) -> Self {
Self {
inner,
op_ptr: NonNull::from(op_ctx),
}
}
}
impl<F: Future + Unpin> Future for InjectContextFuture<F> {
type Output = F::Output;
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<F::Output> {
let this = self.as_mut().get_mut();
struct Restore(Option<NonNull<AsyncFileOperationContext>>);
impl Drop for Restore {
fn drop(&mut self) {
ASYNC_OP_CTX.set(self.0);
}
}
let prev = ASYNC_OP_CTX.replace(Some(this.op_ptr));
let _restore = Restore(prev);
Pin::new(&mut this.inner).poll(cx)
}
}
pub(crate) struct IrpCompletion {
fs: AtomicPtr<FSP_FILE_SYSTEM>,
op_ctx: Box<AsyncFileOperationContext>,
armed: bool,
}
impl IrpCompletion {
pub(crate) fn new(
fs: AtomicPtr<FSP_FILE_SYSTEM>,
op_ctx: Box<AsyncFileOperationContext>,
) -> Self {
Self {
fs,
op_ctx,
armed: true,
}
}
pub(crate) fn op_ctx(&self) -> &AsyncFileOperationContext {
&self.op_ctx
}
pub(crate) fn response_mut(&self) -> *mut FSP_FSCTL_TRANSACT_RSP {
self.op_ctx.response_mut()
}
pub(crate) fn send(&mut self) {
self.armed = false;
let response = unsafe { &mut *self.op_ctx.response_mut() };
unsafe {
winfsp_sys::FspFileSystemSendResponse(
self.fs.load(std::sync::atomic::Ordering::Relaxed),
response,
);
}
}
}
impl Drop for IrpCompletion {
fn drop(&mut self) {
if !self.armed {
return;
}
let response = unsafe { &mut *self.op_ctx.response_mut() };
response.IoStatus.Status = STATUS_CANCELLED.0 as u32;
response.IoStatus.Information = 0;
unsafe {
winfsp_sys::FspFileSystemSendResponse(
self.fs.load(std::sync::atomic::Ordering::Relaxed),
response,
);
}
}
}
pub(crate) unsafe extern "C" fn read_directory_async<T: AsyncFileSystemContext>(
fs: *mut FSP_FILE_SYSTEM,
fctx: PVOID,
pattern: *mut u16,
marker: *mut u16,
buffer: PVOID,
buffer_len: u32,
bytes_transferred: *mut u32,
) -> FSP_STATUS
where
<T as FileSystemContext>::FileContext: Sync,
{
catch_panic!({
assert_ctx!(fs);
assert_ctx!(fctx);
let context: &FileSystemUserContext<T> =
unsafe { &*(*fs).UserContext.cast::<FileSystemUserContext<T>>() };
let fctx = unsafe { &*fctx.cast::<T::FileContext>() };
if !bytes_transferred.is_null() {
unsafe { bytes_transferred.write(0) }
}
let Some(hint) = (unsafe {
<T as FileSystemContext>::with_operation_response(context, |resp| resp.Hint)
}) else {
return STATUS_TRANSACTION_NOT_FOUND.0;
};
return if !buffer.is_null() {
let fs = AtomicPtr::new(fs);
let pattern_owned: Option<U16CString> = if !pattern.is_null() {
Some(unsafe { U16CStr::from_ptr_str(pattern) }.to_ucstring())
} else {
None
};
let marker_owned: Option<U16CString> = if !marker.is_null() {
Some(unsafe { U16CStr::from_ptr_str(marker) }.to_ucstring())
} else {
None
};
let buffer =
unsafe { std::slice::from_raw_parts_mut(buffer as *mut _, buffer_len as usize) };
let Some(guard) = context.in_flight.enter() else {
return STATUS_VOLUME_DISMOUNTED.0;
};
let Some(op_ctx) = (unsafe {
AsyncFileOperationContext::snapshot(
FspTransactKind::FspFsctlTransactQueryDirectoryKind,
hint,
)
}) else {
return STATUS_TRANSACTION_NOT_FOUND.0;
};
let mut irp = IrpCompletion::new(fs, op_ctx);
let readdir_ft = async move {
let _guard = guard;
let user_fut = T::read_directory_async(
context,
fctx,
pattern_owned.as_deref(),
DirMarker(marker_owned.as_deref()),
buffer,
);
let outcome = InjectContextFuture::new(Box::pin(user_fut), irp.op_ctx()).await;
let response = unsafe { &mut *irp.response_mut() };
match outcome {
Ok(read) => {
response.IoStatus.Status = STATUS_SUCCESS.0 as u32;
response.IoStatus.Information = read;
}
Err(e) => {
response.IoStatus.Status = e.to_ntstatus() as u32;
}
}
irp.send();
};
context.spawn_task(readdir_ft);
STATUS_PENDING.0
} else {
STATUS_INSUFFICIENT_RESOURCES.0
};
})
}
pub(crate) unsafe extern "C" fn read_async<T: AsyncFileSystemContext>(
fs: *mut FSP_FILE_SYSTEM,
fctx: PVOID,
buffer: PVOID,
offset: u64,
length: u32,
bytes_transferred: *mut u32,
) -> FSP_STATUS
where
<T as FileSystemContext>::FileContext: Sync,
{
catch_panic!({
assert_ctx!(fs);
assert_ctx!(fctx);
let context: &FileSystemUserContext<T> =
unsafe { &*(*fs).UserContext.cast::<FileSystemUserContext<T>>() };
let fctx = unsafe { &*fctx.cast::<T::FileContext>() };
if !bytes_transferred.is_null() {
unsafe { bytes_transferred.write(0) }
}
let Some(hint) = (unsafe {
<T as FileSystemContext>::with_operation_response(context, |resp| resp.Hint)
}) else {
return STATUS_TRANSACTION_NOT_FOUND.0;
};
return if !buffer.is_null() {
let fs = AtomicPtr::new(fs);
let buffer =
unsafe { std::slice::from_raw_parts_mut(buffer as *mut u8, length as usize) };
let Some(guard) = context.in_flight.enter() else {
return STATUS_VOLUME_DISMOUNTED.0;
};
let Some(op_ctx) = (unsafe {
AsyncFileOperationContext::snapshot(FspTransactKind::FspFsctlTransactReadKind, hint)
}) else {
return STATUS_TRANSACTION_NOT_FOUND.0;
};
let mut irp = IrpCompletion::new(fs, op_ctx);
let read_ft = async move {
let _guard = guard;
let user_fut = T::read_async(context, fctx, buffer, offset);
let outcome = InjectContextFuture::new(Box::pin(user_fut), irp.op_ctx()).await;
let response = unsafe { &mut *irp.response_mut() };
match outcome {
Ok(read) => {
response.IoStatus.Status = STATUS_SUCCESS.0 as u32;
response.IoStatus.Information = read;
}
Err(e) => {
response.IoStatus.Status = e.to_ntstatus() as u32;
}
}
irp.send();
};
context.spawn_task(read_ft);
STATUS_PENDING.0
} else {
STATUS_INSUFFICIENT_RESOURCES.0
};
})
}
pub(crate) unsafe extern "C" fn write_async<T: AsyncFileSystemContext>(
fs: *mut FSP_FILE_SYSTEM,
fctx: PVOID,
buffer: PVOID,
offset: u64,
length: u32,
write_to_eof: u8,
constrained_io: u8,
bytes_transferred: *mut u32,
_out_file_info: *mut FSP_FSCTL_FILE_INFO,
) -> FSP_STATUS
where
<T as FileSystemContext>::FileContext: Sync,
{
catch_panic!({
assert_ctx!(fs);
assert_ctx!(fctx);
let context: &FileSystemUserContext<T> =
unsafe { &*(*fs).UserContext.cast::<FileSystemUserContext<T>>() };
let fctx = unsafe { &*fctx.cast::<T::FileContext>() };
if !bytes_transferred.is_null() {
unsafe { bytes_transferred.write(0) }
}
let Some(hint) = (unsafe {
<T as FileSystemContext>::with_operation_response(context, |resp| resp.Hint)
}) else {
return STATUS_TRANSACTION_NOT_FOUND.0;
};
if !buffer.is_null() {
let buffer =
unsafe { std::slice::from_raw_parts(buffer as *const u8, length as usize) };
let fs = AtomicPtr::new(fs);
let Some(guard) = context.in_flight.enter() else {
return STATUS_VOLUME_DISMOUNTED.0;
};
let Some(op_ctx) = (unsafe {
AsyncFileOperationContext::snapshot(
FspTransactKind::FspFsctlTransactWriteKind,
hint,
)
}) else {
return STATUS_TRANSACTION_NOT_FOUND.0;
};
let mut irp = IrpCompletion::new(fs, op_ctx);
let write_ft = async move {
let _guard = guard;
let mut file_info = FileInfo::default();
let user_fut = T::write_async(
context,
fctx,
buffer,
offset,
write_to_eof != 0,
constrained_io != 0,
&mut file_info,
);
let outcome = InjectContextFuture::new(Box::pin(user_fut), irp.op_ctx()).await;
let response = unsafe { &mut *irp.response_mut() };
match outcome {
Ok(written) => {
response.IoStatus.Status = STATUS_SUCCESS.0 as u32;
response.IoStatus.Information = written;
unsafe {
std::ptr::copy_nonoverlapping(
&file_info as *const FileInfo as *const FSP_FSCTL_FILE_INFO,
&mut response.Rsp.Write.FileInfo,
1,
);
}
}
Err(e) => {
response.IoStatus.Status = e.to_ntstatus() as u32;
}
}
irp.send();
};
context.spawn_task(write_ft);
return STATUS_PENDING.0;
} else {
return STATUS_INSUFFICIENT_RESOURCES.0;
}
})
}