#![allow(unsafe_code)]
#![allow(clippy::vec_box)]
#![allow(non_snake_case)]
#[cfg(feature = "async-fiber")]
use alloc::boxed::Box;
#[cfg(feature = "async-fiber")]
use alloc::vec::Vec;
#[cfg(feature = "async-fiber")]
use core::pin::Pin;
#[cfg(feature = "async-fiber")]
use std::arch::naked_asm;
#[cfg(feature = "async-fiber")]
use std::cell::RefCell;
#[cfg(feature = "async-fiber")]
use std::future::Future;
#[cfg(feature = "async-fiber")]
use std::task::Context;
#[cfg(feature = "async-fiber")]
use std::task::Poll;
#[cfg(feature = "async-fiber")]
#[repr(C, align(64))]
#[derive(Debug)]
pub(crate) struct Registers {
pub(crate) gprs: [u64; 16],
pub(crate) extended_state: [u8; 512],
}
#[cfg(feature = "async-fiber")]
impl Registers {
#[must_use]
#[inline(always)]
pub(crate) const fn new() -> Self {
Self {
gprs: [0; 16],
extended_state: [0; 512],
}
}
}
impl Default for Registers {
#[inline(always)]
fn default() -> Self {
Self::new()
}
}
#[cfg(feature = "async-fiber")]
#[repr(u8)]
#[derive(Copy, Clone, Debug, PartialEq, Eq)]
pub(crate) enum FiberStatus {
Initial,
Running,
Yielded,
Finished,
Panicked,
}
#[cfg(feature = "async-fiber")]
#[allow(dead_code)]
pub(crate) struct GuardedStack {
base: *mut u8,
total_len: usize,
page_size: usize,
}
#[cfg(feature = "async-fiber")]
impl GuardedStack {
#[inline]
#[cfg(unix)]
pub(crate) fn new(usable_size: usize) -> core::result::Result<Self, crate::error::DecodeError> {
let page_size = page_size();
let usable_size = (usable_size + page_size - 1) & !(page_size - 1);
let total_len = page_size + usable_size;
unsafe {
let base = libc::mmap(
core::ptr::null_mut(),
total_len,
libc::PROT_READ | libc::PROT_WRITE,
libc::MAP_PRIVATE | libc::MAP_ANONYMOUS,
-1,
0,
);
if base == libc::MAP_FAILED {
return crate::error::cold_decode_error_other("mmap failed for fiber stack");
}
let rc = libc::mprotect(base, page_size, libc::PROT_NONE);
if rc != 0 {
libc::munmap(base, total_len);
return crate::error::cold_decode_error_other("mprotect failed for guard page");
}
Ok(Self {
base: base.cast::<u8>(),
total_len,
page_size,
})
}
}
#[inline]
#[cfg(windows)]
pub(crate) fn new(usable_size: usize) -> core::result::Result<Self, crate::error::DecodeError> {
let page_size = page_size();
let usable_size = (usable_size + page_size - 1) & !(page_size - 1);
let total_len = page_size + usable_size;
unsafe {
let base = winapi_shim::VirtualAlloc(
core::ptr::null_mut(),
total_len,
winapi_shim::MEM_COMMIT | winapi_shim::MEM_RESERVE,
winapi_shim::PAGE_READWRITE,
);
if base.is_null() {
return crate::error::cold_decode_error_other(
"VirtualAlloc failed for fiber stack",
);
}
let mut old_protect = 0;
let rc = winapi_shim::VirtualProtect(
base,
page_size,
winapi_shim::PAGE_NOACCESS,
&mut old_protect,
);
if rc == 0 {
winapi_shim::VirtualFree(base, 0, winapi_shim::MEM_RELEASE);
return crate::error::cold_decode_error_other(
"VirtualProtect failed for guard page",
);
}
Ok(Self {
base: base.cast::<u8>(),
total_len,
page_size,
})
}
}
#[inline(always)]
#[must_use]
#[allow(dead_code)]
pub(crate) const fn usable(&self) -> &[u8] {
unsafe {
core::slice::from_raw_parts(
self.base.add(self.page_size),
self.total_len - self.page_size,
)
}
}
#[inline(always)]
#[allow(dead_code)]
pub(crate) const fn usable_mut(&mut self) -> &mut [u8] {
unsafe {
core::slice::from_raw_parts_mut(
self.base.add(self.page_size),
self.total_len - self.page_size,
)
}
}
#[inline(always)]
#[must_use]
#[allow(dead_code)]
pub(crate) fn top(&self) -> u64 {
let raw = self.base as u64 + self.total_len as u64;
raw & !15 }
#[inline(always)]
#[must_use]
#[allow(dead_code)]
pub(crate) fn bottom(&self) -> u64 {
self.base as u64 + self.page_size as u64
}
#[inline(always)]
#[must_use]
#[allow(dead_code)]
pub(crate) fn allocation_base(&self) -> u64 {
self.base as u64
}
}
#[cfg(feature = "async-fiber")]
impl Drop for GuardedStack {
#[inline(always)]
#[cfg(unix)]
fn drop(&mut self) {
unsafe {
let rc = libc::munmap(self.base.cast::<libc::c_void>(), self.total_len);
debug_assert!(rc == 0, "munmap failed for fiber stack");
}
}
#[inline(always)]
#[cfg(windows)]
fn drop(&mut self) {
unsafe {
let rc = winapi_shim::VirtualFree(
self.base.cast::<core::ffi::c_void>(),
0, winapi_shim::MEM_RELEASE,
);
debug_assert!(rc != 0, "VirtualFree failed for fiber stack");
}
}
}
#[cfg(feature = "async-fiber")]
unsafe impl Send for GuardedStack {}
#[cfg(all(feature = "async-fiber", unix))]
#[inline(always)]
fn page_size() -> usize {
static PAGE_SIZE: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*PAGE_SIZE.get_or_init(|| unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize })
}
#[cfg(all(feature = "async-fiber", windows))]
#[inline(always)]
fn page_size() -> usize {
static PAGE_SIZE: std::sync::OnceLock<usize> = std::sync::OnceLock::new();
*PAGE_SIZE.get_or_init(|| {
let mut info = core::mem::MaybeUninit::uninit();
unsafe {
winapi_shim::GetSystemInfo(info.as_mut_ptr());
info.assume_init().dwPageSize as usize
}
})
}
#[cfg(all(feature = "async-fiber", windows))]
mod winapi_shim {
#[repr(C)]
pub(crate) struct SYSTEM_INFO {
pub(crate) wProcessorArchitecture: u16,
pub(crate) wReserved: u16,
pub(crate) dwPageSize: u32,
pub(crate) lpMinimumApplicationAddress: *mut core::ffi::c_void,
pub(crate) lpMaximumApplicationAddress: *mut core::ffi::c_void,
pub(crate) dwActiveProcessorMask: usize,
pub(crate) dwNumberOfProcessors: u32,
pub(crate) dwProcessorType: u32,
pub(crate) dwAllocationGranularity: u32,
pub(crate) wProcessorLevel: u16,
pub(crate) wProcessorRevision: u16,
}
pub(crate) const MEM_COMMIT: u32 = 0x00001000;
pub(crate) const MEM_RESERVE: u32 = 0x00002000;
pub(crate) const MEM_RELEASE: u32 = 0x00008000;
pub(crate) const PAGE_NOACCESS: u32 = 0x01;
pub(crate) const PAGE_READWRITE: u32 = 0x04;
unsafe extern "system" {
pub(crate) fn VirtualAlloc(
lpAddress: *mut core::ffi::c_void,
dwSize: usize,
flAllocationType: u32,
flProtect: u32,
) -> *mut core::ffi::c_void;
pub(crate) fn VirtualFree(
lpAddress: *mut core::ffi::c_void,
dwSize: usize,
dwFreeType: u32,
) -> i32;
pub(crate) fn VirtualProtect(
lpAddress: *mut core::ffi::c_void,
dwSize: usize,
flNewProtect: u32,
lpflOldProtect: *mut u32,
) -> i32;
pub(crate) fn GetSystemInfo(lpSystemInfo: *mut SYSTEM_INFO);
}
}
#[cfg(feature = "async-fiber")]
#[repr(C, align(16))]
pub(crate) struct FiberContext {
pub(crate) stack: GuardedStack,
pub(crate) regs: Registers,
pub(crate) executor_regs: Registers,
pub(crate) status: FiberStatus,
pub(crate) panic_payload: Option<Box<dyn std::any::Any + Send>>,
pub(crate) trampoline: unsafe extern "C" fn(),
pub(crate) invoke_closure: unsafe fn(*mut ()),
pub(crate) closure_ptr: *mut (),
pub(crate) result_ptr: *mut (),
pub(crate) reader_ptr: *mut (),
pub(crate) buf_ptr: *mut [u8],
pub(crate) read_buffer: Box<[u8]>,
}
#[cfg(feature = "async-fiber")]
std::thread_local! {
static CONTEXT_POOL: RefCell<Vec<Box<FiberContext>>> = const { RefCell::new(Vec::new()) };
static CURRENT_FIBER: std::cell::Cell<*mut FiberContext> = const { std::cell::Cell::new(core::ptr::null_mut()) };
}
#[cfg(feature = "async-fiber")]
const MAX_POOLED_CONTEXTS: usize = 8_192;
#[cfg(all(feature = "async-fiber", target_arch = "x86_64", unix))]
#[unsafe(naked)]
unsafe extern "C" fn switch_context(
save: *mut Registers,
restore: *const Registers,
) {
naked_asm!(
"mov [rdi + 0], rsp",
"mov [rdi + 8], rbp",
"mov [rdi + 16], rbx",
"mov [rdi + 24], r12",
"mov [rdi + 32], r13",
"mov [rdi + 40], r14",
"mov [rdi + 48], r15",
"fxsave [rdi + 128]",
"lea rax, [rip + 1f]",
"mov [rdi + 56], rax",
"fxrstor [rsi + 128]",
"mov rsp, [rsi + 0]",
"mov rbp, [rsi + 8]",
"mov rbx, [rsi + 16]",
"mov r12, [rsi + 24]",
"mov r13, [rsi + 32]",
"mov r14, [rsi + 40]",
"mov r15, [rsi + 48]",
"jmp [rsi + 56]",
"1: ret"
);
}
#[cfg(all(feature = "async-fiber", target_arch = "x86_64", windows))]
#[unsafe(naked)]
unsafe extern "C" fn switch_context(
save: *mut Registers,
restore: *const Registers,
) {
naked_asm!(
"mov [rcx + 0], rsp",
"mov [rcx + 8], rbp",
"mov [rcx + 16], rbx",
"mov [rcx + 24], r12",
"mov [rcx + 32], r13",
"mov [rcx + 40], r14",
"mov [rcx + 48], r15",
"mov [rcx + 64], rdi",
"mov [rcx + 72], rsi",
"mov rax, gs:[0x08]",
"mov [rcx + 80], rax",
"mov rax, gs:[0x10]",
"mov [rcx + 88], rax",
"mov rax, gs:[0x1478]",
"mov [rcx + 96], rax",
"mov rax, gs:[0x00]",
"mov [rcx + 104], rax",
"fxsave [rcx + 128]",
"lea rax, [rip + 1f]",
"mov [rcx + 56], rax",
"fxrstor [rdx + 128]",
"mov rax, [rdx + 80]",
"mov gs:[0x08], rax",
"mov rax, [rdx + 88]",
"mov gs:[0x10], rax",
"mov rax, [rdx + 96]",
"mov gs:[0x1478], rax",
"mov rax, [rdx + 104]",
"mov gs:[0x00], rax",
"mov rsp, [rdx + 0]",
"mov rbp, [rdx + 8]",
"mov rbx, [rdx + 16]",
"mov r12, [rdx + 24]",
"mov r13, [rdx + 32]",
"mov r14, [rdx + 40]",
"mov r15, [rdx + 48]",
"mov rdi, [rdx + 64]",
"mov rsi, [rdx + 72]",
"jmp [rdx + 56]",
"1: ret"
);
}
#[cfg(all(feature = "async-fiber", target_arch = "aarch64", unix))]
#[unsafe(naked)]
unsafe extern "C" fn switch_context(
save: *mut Registers,
restore: *const Registers,
) {
naked_asm!(
"stp x19, x20, [x0, 0]",
"stp x21, x22, [x0, 16]",
"stp x23, x24, [x0, 32]",
"stp x25, x26, [x0, 48]",
"stp x27, x28, [x0, 64]",
"stp x29, x30, [x0, 80]",
"mov x9, sp",
"str x9, [x0, 96]",
"stp q8, q9, [x0, 128]",
"stp q10, q11, [x0, 160]",
"stp q12, q13, [x0, 192]",
"stp q14, q15, [x0, 224]",
"ldp x19, x20, [x1, 0]",
"ldp x21, x22, [x1, 16]",
"ldp x23, x24, [x1, 32]",
"ldp x25, x26, [x1, 48]",
"ldp x27, x28, [x1, 64]",
"ldp x29, x30, [x1, 80]",
"ldr x9, [x1, 96]",
"mov sp, x9",
"ldp q8, q9, [x1, 128]",
"ldp q10, q11, [x1, 160]",
"ldp q12, q13, [x1, 192]",
"ldp q14, q15, [x1, 224]",
"ret"
);
}
#[cfg(all(feature = "async-fiber", target_arch = "aarch64", windows))]
#[unsafe(naked)]
unsafe extern "C" fn switch_context(
save: *mut Registers,
restore: *const Registers,
) {
naked_asm!(
"stp x19, x20, [x0, 0]",
"stp x21, x22, [x0, 16]",
"stp x23, x24, [x0, 32]",
"stp x25, x26, [x0, 48]",
"stp x27, x28, [x0, 64]",
"stp x29, x30, [x0, 80]",
"mov x9, sp",
"str x9, [x0, 96]",
"ldr x9, [x18, #0x08]",
"str x9, [x0, #104]",
"ldr x9, [x18, #0x10]",
"str x9, [x0, #112]",
"ldr x9, [x18, #0x1478]",
"str x9, [x0, #120]",
"stp q8, q9, [x0, 128]",
"stp q10, q11, [x0, 160]",
"stp q12, q13, [x0, 192]",
"stp q14, q15, [x0, 224]",
"ldp q8, q9, [x1, 128]",
"ldp q10, q11, [x1, 160]",
"ldp q12, q13, [x1, 192]",
"ldp q14, q15, [x1, 224]",
"ldr x9, [x1, #104]",
"str x9, [x18, #0x08]",
"ldr x9, [x1, #112]",
"str x9, [x18, #0x10]",
"ldr x9, [x1, #120]",
"str x9, [x18, #0x1478]",
"ldp x19, x20, [x1, 0]",
"ldp x21, x22, [x1, 16]",
"ldp x23, x24, [x1, 32]",
"ldp x25, x26, [x1, 48]",
"ldp x27, x28, [x1, 64]",
"ldp x29, x30, [x1, 80]",
"ldr x9, [x1, 96]",
"mov sp, x9",
"ret"
);
}
#[cfg(all(feature = "async-fiber", target_arch = "riscv64", unix))]
#[unsafe(naked)]
unsafe extern "C" fn switch_context(
save: *mut Registers,
restore: *const Registers,
) {
naked_asm!(
"sd sp, 0(a0)",
"sd s0, 8(a0)",
"sd s1, 16(a0)",
"sd s2, 24(a0)",
"sd s3, 32(a0)",
"sd s4, 40(a0)",
"sd s5, 48(a0)",
"sd s6, 56(a0)",
"sd s7, 64(a0)",
"sd s8, 72(a0)",
"sd s9, 80(a0)",
"sd s10, 88(a0)",
"sd s11, 96(a0)",
"sd ra, 104(a0)",
"fsd fs0, 128(a0)",
"fsd fs1, 136(a0)",
"fsd fs2, 144(a0)",
"fsd fs3, 152(a0)",
"fsd fs4, 160(a0)",
"fsd fs5, 168(a0)",
"fsd fs6, 176(a0)",
"fsd fs7, 184(a0)",
"fsd fs8, 192(a0)",
"fsd fs9, 200(a0)",
"fsd fs10, 208(a0)",
"fsd fs11, 216(a0)",
"ld sp, 0(a1)",
"ld s0, 8(a1)",
"ld s1, 16(a1)",
"ld s2, 24(a1)",
"ld s3, 32(a1)",
"ld s4, 40(a1)",
"ld s5, 48(a1)",
"ld s6, 56(a1)",
"ld s7, 64(a1)",
"ld s8, 72(a1)",
"ld s9, 80(a1)",
"ld s10, 88(a1)",
"ld s11, 96(a1)",
"ld ra, 104(a1)",
"fld fs0, 128(a1)",
"fld fs1, 136(a1)",
"fld fs2, 144(a1)",
"fld fs3, 152(a1)",
"fld fs4, 160(a1)",
"fld fs5, 168(a1)",
"fld fs6, 176(a1)",
"fld fs7, 184(a1)",
"fld fs8, 192(a1)",
"fld fs9, 200(a1)",
"fld fs10, 208(a1)",
"fld fs11, 216(a1)",
"ret"
);
}
#[cfg(all(
feature = "async-fiber",
not(any(
all(target_arch = "x86_64", any(unix, windows)),
all(target_arch = "aarch64", any(unix, windows)),
all(target_arch = "riscv64", unix)
))
))]
compile_error!(
"Unified Fiber-backed Async (async-fiber) is supported on: x86_64 (unix/windows), aarch64 (unix/windows), riscv64 (unix only)."
);
#[cfg(feature = "async-fiber")]
pub(crate) struct FiberReader<'a, R: futures_io::AsyncRead + Unpin> {
pub(crate) inner: std::marker::PhantomData<&'a mut R>,
pub(crate) ctx: *mut FiberContext,
}
#[cfg(feature = "async-fiber")]
impl<R: futures_io::AsyncRead + Unpin> crate::de::read::Reader for FiberReader<'_, R> {
#[inline]
fn read(
&mut self,
bytes: &mut [u8],
) -> Result<(), crate::error::DecodeError> {
let n = bytes.len();
let mut written = 0;
let ctx = unsafe { &mut *self.ctx };
while written < n {
let buf = unsafe { &mut *ctx.buf_ptr };
if buf.is_empty() {
unsafe {
ctx.status = FiberStatus::Yielded;
switch_context(&raw mut ctx.regs, &raw const ctx.executor_regs);
if ctx.status == FiberStatus::Finished {
return crate::error::cold_decode_error_unexpected_end(n - written);
}
}
}
let buf = unsafe { &mut *ctx.buf_ptr };
if buf.is_empty() {
return crate::error::cold_decode_error_unexpected_end(n - written);
}
let to_copy = core::cmp::min(n - written, buf.len());
bytes[written..written + to_copy].copy_from_slice(&buf[0..to_copy]);
unsafe {
ctx.buf_ptr = core::ptr::slice_from_raw_parts_mut(
buf.as_mut_ptr().add(to_copy),
buf.len() - to_copy,
);
}
written += to_copy;
}
Ok(())
}
}
#[cfg(feature = "async-fiber")]
pub(crate) struct AsyncFiberBridge<R: futures_io::AsyncRead + Unpin> {
pub(crate) reader: R,
}
#[cfg(feature = "async-fiber")]
impl<R: futures_io::AsyncRead + Unpin> AsyncFiberBridge<R> {
#[inline(always)]
pub(crate) const fn new(reader: R) -> Self {
Self { reader }
}
#[inline(always)]
pub(crate) fn run<F, T>(
self,
f: F,
) -> impl Future<Output = Result<T, crate::error::DecodeError>>
where
F: FnOnce(&mut FiberReader<'_, R>) -> Result<T, crate::error::DecodeError>,
{
BridgeFuture {
reader: self.reader,
f: Some(f),
ctx: None,
result: None,
_marker: core::marker::PhantomData,
}
}
}
#[cfg(feature = "async-fiber")]
#[inline(always)]
const unsafe fn dummy_invoke(_: *mut ()) {}
#[cfg(feature = "async-fiber")]
#[inline]
unsafe extern "C" fn fiber_trampoline() {
unsafe {
let ctx_ptr = CURRENT_FIBER.with(core::cell::Cell::get);
let ctx = &mut *ctx_ptr;
let result = std::panic::catch_unwind(std::panic::AssertUnwindSafe(|| {
(ctx.invoke_closure)(ctx.closure_ptr);
}));
ctx.status = if let Err(e) = result {
ctx.panic_payload = Some(e);
FiberStatus::Panicked
} else {
FiberStatus::Finished
};
CURRENT_FIBER.with(|c| c.set(core::ptr::null_mut()));
switch_context(&raw mut ctx.regs, &raw const ctx.executor_regs);
unreachable!("fiber finished and should not be resumed");
}
}
#[cfg(feature = "async-fiber")]
#[inline]
unsafe fn resume_fiber(ctx: &mut FiberContext) {
unsafe {
CURRENT_FIBER.with(|c| c.set(core::ptr::from_mut(ctx)));
switch_context(&raw mut ctx.executor_regs, &raw const ctx.regs);
CURRENT_FIBER.with(|c| c.set(core::ptr::null_mut()));
}
}
#[cfg(feature = "async-fiber")]
struct BridgeFuture<R, F, T> {
reader: R,
f: Option<F>,
ctx: Option<Box<FiberContext>>,
result: Option<Result<T, crate::error::DecodeError>>,
_marker: core::marker::PhantomData<T>,
}
#[cfg(feature = "async-fiber")]
#[allow(clippy::non_send_fields_in_send_ty)]
unsafe impl<R: Send, F: Send, T: Send> Send for BridgeFuture<R, F, T> {}
#[cfg(feature = "async-fiber")]
unsafe impl<R: Sync, F: Sync, T: Sync> Sync for BridgeFuture<R, F, T> {}
#[cfg(feature = "async-fiber")]
impl<R, F, T> Future for BridgeFuture<R, F, T>
where
R: futures_io::AsyncRead + Unpin,
F: FnOnce(&mut FiberReader<'_, R>) -> Result<T, crate::error::DecodeError>,
{
type Output = Result<T, crate::error::DecodeError>;
#[allow(clippy::too_many_lines)]
fn poll(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Self::Output> {
if self.ctx.is_none() {
let mut ctx = if let Some(ctx) = CONTEXT_POOL.with(|pool| pool.borrow_mut().pop()) {
ctx
} else {
match GuardedStack::new(2 * 1024 * 1024) {
| Ok(stack) => {
Box::new(FiberContext {
stack,
regs: Registers::new(),
executor_regs: Registers::new(),
status: FiberStatus::Initial,
panic_payload: None,
trampoline: fiber_trampoline,
invoke_closure: dummy_invoke,
closure_ptr: core::ptr::null_mut(),
result_ptr: core::ptr::null_mut(),
reader_ptr: core::ptr::null_mut(),
buf_ptr: core::ptr::slice_from_raw_parts_mut(core::ptr::null_mut(), 0),
read_buffer: alloc::vec![0; 8192].into_boxed_slice(),
})
},
| Err(e) => return Poll::Ready(Err(e)),
}
};
ctx.status = FiberStatus::Initial;
ctx.panic_payload = None;
ctx.result_ptr = core::ptr::null_mut();
ctx.reader_ptr = core::ptr::null_mut();
ctx.buf_ptr = core::ptr::slice_from_raw_parts_mut(core::ptr::null_mut(), 0);
let sp = ctx.stack.top();
#[cfg(all(target_arch = "x86_64", unix))]
{
ctx.regs.gprs[0] = sp - 8;
ctx.regs.gprs[1] = 0; ctx.regs.gprs[7] = fiber_trampoline as *const () as u64; ctx.regs.extended_state[24..28].copy_from_slice(&0x1F80u32.to_ne_bytes());
}
#[cfg(all(target_arch = "x86_64", windows))]
{
ctx.regs.gprs[0] = sp - 40;
ctx.regs.gprs[1] = 0; ctx.regs.gprs[7] = fiber_trampoline as *const () as u64; ctx.regs.gprs[10] = ctx.stack.top(); ctx.regs.gprs[11] = ctx.stack.bottom(); ctx.regs.gprs[12] = ctx.stack.allocation_base(); ctx.regs.gprs[13] = 0xFFFFFFFFFFFFFFFFu64; ctx.regs.extended_state[24..28].copy_from_slice(&0x1F80u32.to_ne_bytes());
}
#[cfg(all(target_arch = "aarch64", unix))]
{
ctx.regs.gprs[10] = 0; ctx.regs.gprs[11] = fiber_trampoline as u64; ctx.regs.gprs[12] = sp; }
#[cfg(all(target_arch = "aarch64", windows))]
{
ctx.regs.gprs[10] = 0; ctx.regs.gprs[11] = fiber_trampoline as u64; ctx.regs.gprs[12] = sp; ctx.regs.gprs[13] = ctx.stack.top(); ctx.regs.gprs[14] = ctx.stack.bottom(); ctx.regs.gprs[15] = ctx.stack.bottom(); }
#[cfg(target_arch = "riscv64")]
{
ctx.regs.gprs[0] = sp; ctx.regs.gprs[1] = 0; ctx.regs.gprs[13] = fiber_trampoline as u64; }
let this = unsafe { self.as_mut().get_unchecked_mut() };
this.ctx = Some(ctx);
}
let this = unsafe { self.get_unchecked_mut() };
let this_ptr = core::ptr::from_mut::<Self>(this).cast::<()>();
let ctx = this.ctx.as_mut().unwrap();
ctx.result_ptr = (&raw mut this.result).cast::<()>();
if this.f.is_some() && ctx.status == FiberStatus::Initial {
unsafe fn invoke<R: futures_io::AsyncRead + Unpin, F, T>(data: *mut ())
where
F: FnOnce(&mut FiberReader<'_, R>) -> Result<T, crate::error::DecodeError>,
{
unsafe {
let this = &mut *data.cast::<BridgeFuture<R, F, T>>();
let f = this.f.take().unwrap();
let ctx_ptr = CURRENT_FIBER.with(core::cell::Cell::get);
let mut real_reader: FiberReader<'_, R> = FiberReader {
inner: core::marker::PhantomData,
ctx: ctx_ptr,
};
let res = f(&mut real_reader);
let rp = (*ctx_ptr)
.result_ptr
.cast::<Option<Result<T, crate::error::DecodeError>>>();
*rp = Some(res);
}
}
ctx.closure_ptr = this_ptr;
ctx.invoke_closure = invoke::<R, F, T>;
ctx.status = FiberStatus::Running;
unsafe {
resume_fiber(ctx);
}
}
loop {
let ctx = this.ctx.as_mut().unwrap();
match ctx.status {
| FiberStatus::Finished => {
if let Some(ctx) = this.ctx.take() {
CONTEXT_POOL.with(|pool| {
let mut p = pool.borrow_mut();
if p.len() < MAX_POOLED_CONTEXTS {
p.push(ctx);
}
});
}
return Poll::Ready(this.result.take().unwrap());
},
| FiberStatus::Panicked => {
let payload = ctx.panic_payload.take().unwrap();
if let Some(ctx) = this.ctx.take() {
CONTEXT_POOL.with(|pool| {
let mut p = pool.borrow_mut();
if p.len() < MAX_POOLED_CONTEXTS {
p.push(ctx);
}
});
}
std::panic::resume_unwind(payload);
},
| FiberStatus::Yielded => {
let ctx_read_buf = &mut ctx.read_buffer[..];
let poll_res = Pin::new(&mut this.reader).poll_read(cx, ctx_read_buf);
match poll_res {
| Poll::Ready(Ok(filled)) => {
if filled == 0 {
ctx.status = FiberStatus::Finished;
ctx.buf_ptr = core::ptr::slice_from_raw_parts_mut(
ctx.read_buffer.as_mut_ptr(),
0,
);
unsafe {
resume_fiber(ctx);
}
continue;
}
ctx.status = FiberStatus::Running;
ctx.buf_ptr = core::ptr::slice_from_raw_parts_mut(
ctx.read_buffer.as_mut_ptr(),
filled,
);
unsafe {
resume_fiber(ctx);
}
},
| Poll::Ready(Err(e)) => {
if let Some(ctx) = this.ctx.take() {
CONTEXT_POOL.with(|pool| {
let mut p = pool.borrow_mut();
if p.len() < MAX_POOLED_CONTEXTS {
p.push(ctx);
}
});
}
return Poll::Ready(crate::error::cold_decode_error_io(e, 1));
},
| Poll::Pending => return Poll::Pending,
}
},
| _ => {
unreachable!("invalid fiber status in poll loop");
},
}
}
}
}
#[cfg(feature = "async-fiber")]
impl<R, F, T> Drop for BridgeFuture<R, F, T> {
#[inline(always)]
fn drop(&mut self) {
if let Some(ctx) = self.ctx.take() {
drop(ctx);
}
}
}
#[cfg(all(feature = "tokio", feature = "async-fiber"))]
pub(crate) struct TokioReader<R>(pub(crate) R);
#[cfg(all(feature = "tokio", feature = "async-fiber"))]
impl<R: tokio::io::AsyncRead + Unpin> futures_io::AsyncRead for TokioReader<R> {
#[inline]
fn poll_read(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
buf: &mut [u8],
) -> Poll<std::io::Result<usize>> {
let mut read_buf = tokio::io::ReadBuf::new(buf);
match Pin::new(&mut self.0).poll_read(cx, &mut read_buf) {
| Poll::Ready(Ok(())) => Poll::Ready(Ok(read_buf.filled().len())),
| Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
| Poll::Pending => Poll::Pending,
}
}
}