shotover 0.7.2

Shotover API for building custom transforms
Documentation
//! Utilities for reading and writing JSON messages over Unix SEQPACKET sockets.

use anyhow::{Context, Result};
use serde::Serialize;
use serde::de::DeserializeOwned;
use std::os::unix::io::{FromRawFd, OwnedFd, RawFd};
use uds::tokio::UnixSeqpacketConn;

/// Read a complete JSON message from a SEQPACKET socket
pub async fn read_json<T: DeserializeOwned>(conn: &mut UnixSeqpacketConn) -> Result<T> {
    let mut buf = vec![0u8; 65536]; // Large buffer for JSON messages
    let bytes_read = conn
        .recv(&mut buf)
        .await
        .context("Failed to read packet from socket")?;

    // Resize buffer to actual data received
    buf.truncate(bytes_read);

    // Parse the complete JSON message
    serde_json::from_slice(&buf).context("Failed to parse JSON from packet")
}

/// Write a complete JSON message to a SEQPACKET socket
pub async fn write_json<T: Serialize>(conn: &mut UnixSeqpacketConn, data: &T) -> Result<()> {
    let json_bytes = serde_json::to_vec(data).context("Failed to serialize JSON")?;

    conn.send(&json_bytes)
        .await
        .context("Failed to send JSON packet")?;

    Ok(())
}

/// Read a complete JSON message from a SEQPACKET socket with file descriptors
pub async fn read_json_with_fds<T: DeserializeOwned>(
    conn: &mut UnixSeqpacketConn,
) -> Result<(T, Vec<OwnedFd>)> {
    let mut buf = vec![0u8; 65536]; // Large buffer for JSON messages
    let mut fd_buf = vec![0i32; 250]; // Buffer for file descriptors
    let (bytes_read, _truncated, fds_read) = conn
        .recv_fds(&mut buf, &mut fd_buf)
        .await
        .context("Failed to read packet with FDs from socket")?;

    // Convert RawFds to OwnedFds immediately after receiving from recv_fds
    // SAFETY: This is safe because we just received these FDs from the OS via recv_fds
    #[allow(unsafe_code)]
    let owned_fds: Vec<OwnedFd> = fd_buf[..fds_read]
        .iter()
        .map(|&raw_fd| unsafe { OwnedFd::from_raw_fd(raw_fd) })
        .collect();

    // Resize buffer to actual data received
    buf.truncate(bytes_read);

    // Parse the complete JSON message
    let data = serde_json::from_slice(&buf).context("Failed to parse JSON from packet")?;

    Ok((data, owned_fds))
}

/// Write a complete JSON message to a SEQPACKET socket with file descriptors
pub async fn write_json_with_fds<T: Serialize>(
    conn: &mut UnixSeqpacketConn,
    data: &T,
    fds: &[RawFd],
) -> Result<()> {
    let json_bytes = serde_json::to_vec(data).context("Failed to serialize JSON")?;

    conn.send_fds(&json_bytes, fds)
        .await
        .context("Failed to send JSON packet with FDs")?;

    Ok(())
}