use pyo3::prelude::*;
use pyo3::types::PyBytes;
use std::io::{Read, Write};
use std::time::Duration;
use super::json_utils::{json_value_to_py, py_to_json_value};
use crate::error::IpcError;
use crate::graceful::{
GracefulChannel, GracefulIpcChannel as RustGracefulIpcChannel,
GracefulNamedPipe as RustGracefulNamedPipe,
};
#[pyclass(name = "GracefulNamedPipe")]
pub struct PyGracefulNamedPipe {
inner: RustGracefulNamedPipe,
}
#[pymethods]
impl PyGracefulNamedPipe {
#[staticmethod]
fn create(name: &str) -> PyResult<Self> {
let inner = RustGracefulNamedPipe::create(name)?;
Ok(Self { inner })
}
#[staticmethod]
fn connect(name: &str) -> PyResult<Self> {
let inner = RustGracefulNamedPipe::connect(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn is_server(&self) -> bool {
self.inner.is_server()
}
#[getter]
fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
fn wait_for_client(&mut self, py: Python<'_>) -> PyResult<()> {
py.detach(|| self.inner.wait_for_client())?;
Ok(())
}
fn shutdown(&self) {
self.inner.shutdown();
}
fn drain(&self, py: Python<'_>) -> PyResult<()> {
py.detach(|| self.inner.drain())?;
Ok(())
}
fn shutdown_timeout(&self, py: Python<'_>, timeout_ms: u64) -> PyResult<()> {
let timeout = Duration::from_millis(timeout_ms);
py.detach(|| self.inner.shutdown_timeout(timeout))?;
Ok(())
}
fn read(&mut self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut buf = vec![0u8; size];
let n = py.detach(|| self.inner.read(&mut buf))?;
buf.truncate(n);
Ok(PyBytes::new(py, &buf).into())
}
fn write(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<usize> {
let n = py.detach(|| self.inner.write(&data))?;
Ok(n)
}
fn read_exact(&mut self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut buf = vec![0u8; size];
py.detach(|| self.inner.read_exact(&mut buf))?;
Ok(PyBytes::new(py, &buf).into())
}
fn write_all(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<()> {
py.detach(|| self.inner.write_all(&data))?;
Ok(())
}
}
#[pyclass(name = "GracefulIpcChannel")]
pub struct PyGracefulIpcChannel {
inner: RustGracefulIpcChannel<Vec<u8>>,
}
#[pymethods]
impl PyGracefulIpcChannel {
#[staticmethod]
fn create(name: &str) -> PyResult<Self> {
let inner = RustGracefulIpcChannel::create(name)?;
Ok(Self { inner })
}
#[staticmethod]
fn connect(name: &str) -> PyResult<Self> {
let inner = RustGracefulIpcChannel::connect(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn is_server(&self) -> bool {
self.inner.is_server()
}
#[getter]
fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
fn wait_for_client(&mut self, py: Python<'_>) -> PyResult<()> {
py.detach(|| self.inner.wait_for_client())?;
Ok(())
}
fn shutdown(&self) {
self.inner.shutdown();
}
fn drain(&self, py: Python<'_>) -> PyResult<()> {
py.detach(|| self.inner.drain())?;
Ok(())
}
fn shutdown_timeout(&self, py: Python<'_>, timeout_ms: u64) -> PyResult<()> {
let timeout = Duration::from_millis(timeout_ms);
py.detach(|| self.inner.shutdown_timeout(timeout))?;
Ok(())
}
fn send(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<()> {
py.detach(|| self.inner.send_bytes(&data))?;
Ok(())
}
fn recv(&mut self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
let data = py.detach(|| self.inner.recv_bytes())?;
Ok(PyBytes::new(py, &data).into())
}
fn send_json(&mut self, py: Python<'_>, obj: &Bound<'_, PyAny>) -> PyResult<()> {
let value = py_to_json_value(obj)?;
let json_bytes = serde_json::to_vec(&value)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
py.detach(|| self.inner.send_bytes(&json_bytes))?;
Ok(())
}
fn recv_json(&mut self, py: Python<'_>) -> PyResult<Py<PyAny>> {
let data = py.detach(|| self.inner.recv_bytes())?;
let value: serde_json::Value =
serde_json::from_slice(&data).map_err(|e| IpcError::deserialization(e.to_string()))?;
json_value_to_py(py, &value)
}
}