wasmer-wasix 0.702.0

WASI and WASIX implementation library for Wasmer WebAssembly runtime
use std::{mem::MaybeUninit, task::Waker};

use super::*;
use crate::{net::MAX_SOCKET_PAYLOAD, net::socket::TimeType, syscalls::*};

/// ### `sock_send()`
/// Send a message on a socket.
/// Note: This is similar to `send` in POSIX, though it also supports writing
/// the data from multiple buffers in the manner of `writev`.
///
/// ## Parameters
///
/// * `si_data` - List of scatter/gather vectors to which to retrieve data
/// * `si_flags` - Message flags.
///
/// ## Return
///
/// Number of bytes transmitted.
#[instrument(level = "trace", skip_all, fields(%fd, nsent = field::Empty), ret)]
pub fn sock_send<M: MemorySize>(
    mut ctx: FunctionEnvMut<'_, WasiEnv>,
    fd: WasiFd,
    si_data: WasmPtr<__wasi_ciovec_t<M>, M>,
    si_data_len: M::Offset,
    si_flags: SiFlags,
    ret_data_len: WasmPtr<M::Offset, M>,
) -> Result<Errno, WasiError> {
    WasiEnv::do_pending_operations(&mut ctx)?;

    let env = ctx.data();
    let fd_entry = wasi_try_ok!(env.state.fs.get_fd(fd));
    let enable_journal = env.enable_journal;
    let guard = fd_entry.inode.read();
    // Some guests route socket-like wakeups through pipe-backed fds.
    let use_write = matches!(guard.deref(), Kind::DuplexPipe { .. } | Kind::PipeTx { .. });
    drop(guard);

    let bytes_written = if use_write {
        let offset = { fd_entry.inner.offset.load(Ordering::Acquire) as usize };

        wasi_try_ok!(fd_write_internal::<M>(
            &mut ctx,
            fd,
            fd_entry,
            FdWriteSource::Iovs {
                iovs: si_data,
                iovs_len: si_data_len
            },
            offset as u64,
            true,
            enable_journal
        )?)
    } else {
        wasi_try_ok!(sock_send_internal::<M>(
            &ctx,
            fd,
            FdWriteSource::Iovs {
                iovs: si_data,
                iovs_len: si_data_len
            },
            si_flags,
        )?)
    };

    #[cfg(feature = "journal")]
    if ctx.data().enable_journal {
        JournalEffector::save_sock_send(&ctx, fd, bytes_written, si_data, si_data_len, si_flags)
            .map_err(|err| {
                tracing::error!("failed to save sock_send event - {}", err);
                WasiError::Exit(ExitCode::from(Errno::Fault))
            })?;
    }

    Span::current().record("nsent", bytes_written);

    let env = ctx.data();
    let memory = unsafe { env.memory_view(&ctx) };
    let bytes_written: M::Offset =
        wasi_try_ok!(bytes_written.try_into().map_err(|_| Errno::Overflow));
    wasi_try_mem_ok!(ret_data_len.write(&memory, bytes_written));

    Ok(Errno::Success)
}

pub(crate) fn sock_send_internal<M: MemorySize>(
    ctx: &FunctionEnvMut<'_, WasiEnv>,
    sock: WasiFd,
    si_data: FdWriteSource<'_, M>,
    si_flags: SiFlags,
) -> Result<Result<usize, Errno>, WasiError> {
    let env = ctx.data();
    let memory = unsafe { env.memory_view(&ctx) };
    let runtime = env.runtime.clone();

    let nonblocking_flag = (si_flags & __WASI_SOCK_SEND_INPUT_DONT_WAIT) != 0;

    let bytes_written = wasi_try_ok_ok!(__sock_asyncify(
        env,
        sock,
        Rights::SOCK_SEND,
        |socket, fd| async move {
            let nonblocking = nonblocking_flag || fd.inner.flags.contains(Fdflags::NONBLOCK);
            let timeout = socket
                .opt_time(TimeType::WriteTimeout)
                .ok()
                .flatten()
                .unwrap_or(Duration::from_secs(30));

            if socket.is_dgram() {
                let data = si_data.coalesce(&memory, MAX_SOCKET_PAYLOAD)?;
                return socket
                    .send(
                        env.tasks().deref(),
                        data.as_ref(),
                        Some(timeout),
                        nonblocking,
                    )
                    .await;
            }

            match si_data {
                FdWriteSource::Iovs { iovs, iovs_len } => {
                    let iovs_arr = iovs.slice(&memory, iovs_len).map_err(mem_error_to_wasi)?;
                    let iovs_arr = iovs_arr.access().map_err(mem_error_to_wasi)?;

                    let mut sent = 0usize;
                    for iovs in iovs_arr.iter() {
                        let buf = WasmPtr::<u8, M>::new(iovs.buf)
                            .slice(&memory, iovs.buf_len)
                            .map_err(mem_error_to_wasi)?
                            .access()
                            .map_err(mem_error_to_wasi)?;
                        let local_sent = match socket
                            .send(
                                env.tasks().deref(),
                                buf.as_ref(),
                                Some(timeout),
                                nonblocking,
                            )
                            .await
                        {
                            Ok(s) => s,
                            Err(_) if sent > 0 => break,
                            Err(err) => return Err(err),
                        };
                        sent += local_sent;
                        if local_sent != buf.len() {
                            break;
                        }
                    }
                    Ok(sent)
                }
                FdWriteSource::Buffer(data) => {
                    socket
                        .send(
                            env.tasks().deref(),
                            data.as_ref(),
                            Some(timeout),
                            nonblocking,
                        )
                        .await
                }
            }
        }
    ));
    trace!(
        %bytes_written,
    );

    Ok(Ok(bytes_written))
}