use pyo3::prelude::*;
use pyo3::types::{PyBool, PyBytes, PyDict, PyFloat, PyInt, PyList, PyString};
use std::io::{Read, Write};
use std::time::Duration;
use crate::error::IpcError;
use crate::file_channel::{
FileChannel as RustFileChannel, FileMessage as RustFileMessage, MessageType as RustMessageType,
};
use crate::graceful::{
GracefulChannel, GracefulIpcChannel as RustGracefulIpcChannel,
GracefulNamedPipe as RustGracefulNamedPipe,
};
use crate::pipe::{AnonymousPipe as RustAnonymousPipe, NamedPipe as RustNamedPipe};
use crate::shm::SharedMemory as RustSharedMemory;
fn py_to_json_value(obj: &Bound<'_, PyAny>) -> PyResult<serde_json::Value> {
if obj.is_none() {
return Ok(serde_json::Value::Null);
}
if let Ok(b) = obj.downcast::<PyBool>() {
return Ok(serde_json::Value::Bool(b.is_true()));
}
if let Ok(i) = obj.downcast::<PyInt>() {
if let Ok(v) = i.extract::<i64>() {
return Ok(serde_json::Value::Number(v.into()));
}
if let Ok(v) = i.extract::<u64>() {
return Ok(serde_json::Value::Number(v.into()));
}
if let Ok(v) = i.extract::<f64>() {
if let Some(n) = serde_json::Number::from_f64(v) {
return Ok(serde_json::Value::Number(n));
}
}
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Integer too large for JSON",
));
}
if let Ok(f) = obj.downcast::<PyFloat>() {
let v: f64 = f.extract()?;
if let Some(n) = serde_json::Number::from_f64(v) {
return Ok(serde_json::Value::Number(n));
}
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Float value is not valid JSON (NaN or Infinity)",
));
}
if let Ok(s) = obj.downcast::<PyString>() {
let v: String = s.extract()?;
return Ok(serde_json::Value::String(v));
}
if let Ok(b) = obj.downcast::<PyBytes>() {
let bytes: &[u8] = b.as_bytes();
use base64::Engine;
let encoded = base64::engine::general_purpose::STANDARD.encode(bytes);
return Ok(serde_json::Value::String(encoded));
}
if let Ok(list) = obj.downcast::<PyList>() {
let mut arr = Vec::with_capacity(list.len());
for item in list.iter() {
arr.push(py_to_json_value(&item)?);
}
return Ok(serde_json::Value::Array(arr));
}
if let Ok(dict) = obj.downcast::<PyDict>() {
let mut map = serde_json::Map::new();
for (key, value) in dict.iter() {
let key_str: String = key.extract().map_err(|_| {
PyErr::new::<pyo3::exceptions::PyTypeError, _>("Dict keys must be strings")
})?;
map.insert(key_str, py_to_json_value(&value)?);
}
return Ok(serde_json::Value::Object(map));
}
Err(PyErr::new::<pyo3::exceptions::PyTypeError, _>(format!(
"Cannot convert type to JSON: {:?}",
obj.get_type().name()
)))
}
fn json_value_to_py(py: Python<'_>, value: &serde_json::Value) -> PyResult<PyObject> {
match value {
serde_json::Value::Null => Ok(py.None()),
serde_json::Value::Bool(b) => Ok(b.into_pyobject(py)?.to_owned().into_any().unbind()),
serde_json::Value::Number(n) => {
if let Some(i) = n.as_i64() {
Ok(i.into_pyobject(py)?.into_any().unbind())
} else if let Some(u) = n.as_u64() {
Ok(u.into_pyobject(py)?.into_any().unbind())
} else if let Some(f) = n.as_f64() {
Ok(f.into_pyobject(py)?.into_any().unbind())
} else {
Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(
"Invalid JSON number",
))
}
}
serde_json::Value::String(s) => Ok(s.into_pyobject(py)?.into_any().unbind()),
serde_json::Value::Array(arr) => {
let list = PyList::empty(py);
for item in arr {
list.append(json_value_to_py(py, item)?)?;
}
Ok(list.into())
}
serde_json::Value::Object(map) => {
let dict = PyDict::new(py);
for (key, value) in map {
dict.set_item(key, json_value_to_py(py, value)?)?;
}
Ok(dict.into())
}
}
}
#[pyfunction]
fn json_dumps(obj: &Bound<'_, PyAny>) -> PyResult<String> {
let value = py_to_json_value(obj)?;
serde_json::to_string(&value)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))
}
#[pyfunction]
fn json_dumps_pretty(obj: &Bound<'_, PyAny>) -> PyResult<String> {
let value = py_to_json_value(obj)?;
serde_json::to_string_pretty(&value)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))
}
#[pyfunction]
fn json_loads(py: Python<'_>, s: &str) -> PyResult<PyObject> {
let value: serde_json::Value = serde_json::from_str(s)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
json_value_to_py(py, &value)
}
#[pyclass(name = "AnonymousPipe")]
pub struct PyAnonymousPipe {
reader: std::sync::Mutex<Option<crate::pipe::PipeReader>>,
writer: std::sync::Mutex<Option<crate::pipe::PipeWriter>>,
}
#[pymethods]
impl PyAnonymousPipe {
#[new]
fn new() -> PyResult<Self> {
let pipe = RustAnonymousPipe::new()?;
let (reader, writer) = pipe.split();
Ok(Self {
reader: std::sync::Mutex::new(Some(reader)),
writer: std::sync::Mutex::new(Some(writer)),
})
}
fn read(&self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut guard = self
.reader
.lock()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Lock poisoned"))?;
let reader = guard.as_mut().ok_or(IpcError::Closed)?;
let mut buf = vec![0u8; size];
let n = py.allow_threads(|| reader.read(&mut buf))?;
buf.truncate(n);
Ok(PyBytes::new(py, &buf).into())
}
fn write(&self, py: Python<'_>, data: &[u8]) -> PyResult<usize> {
let mut guard = self
.writer
.lock()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Lock poisoned"))?;
let writer = guard.as_mut().ok_or(IpcError::Closed)?;
let data = data.to_vec(); let n = py.allow_threads(|| writer.write(&data))?;
Ok(n)
}
#[cfg(unix)]
fn reader_fd(&self) -> PyResult<i32> {
use std::os::unix::io::AsRawFd;
let guard = self
.reader
.lock()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Lock poisoned"))?;
let reader = guard.as_ref().ok_or(IpcError::Closed)?;
Ok(reader.as_raw_fd())
}
#[cfg(unix)]
fn writer_fd(&self) -> PyResult<i32> {
use std::os::unix::io::AsRawFd;
let guard = self
.writer
.lock()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Lock poisoned"))?;
let writer = guard.as_ref().ok_or(IpcError::Closed)?;
Ok(writer.as_raw_fd())
}
fn take_reader(&self) -> PyResult<()> {
let mut guard = self
.reader
.lock()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Lock poisoned"))?;
guard.take();
Ok(())
}
fn take_writer(&self) -> PyResult<()> {
let mut guard = self
.writer
.lock()
.map_err(|_| PyErr::new::<pyo3::exceptions::PyRuntimeError, _>("Lock poisoned"))?;
guard.take();
Ok(())
}
}
#[pyclass(name = "NamedPipe")]
pub struct PyNamedPipe {
inner: RustNamedPipe,
}
#[pymethods]
impl PyNamedPipe {
#[staticmethod]
fn create(name: &str) -> PyResult<Self> {
let inner = RustNamedPipe::create(name)?;
Ok(Self { inner })
}
#[staticmethod]
fn connect(name: &str) -> PyResult<Self> {
let inner = RustNamedPipe::connect(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn is_server(&self) -> bool {
self.inner.is_server()
}
fn wait_for_client(&mut self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| self.inner.wait_for_client())?;
Ok(())
}
fn read(&mut self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut buf = vec![0u8; size];
let n = py.allow_threads(|| self.inner.read(&mut buf))?;
buf.truncate(n);
Ok(PyBytes::new(py, &buf).into())
}
fn write(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<usize> {
let n = py.allow_threads(|| self.inner.write(&data))?;
Ok(n)
}
fn read_exact(&mut self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut buf = vec![0u8; size];
py.allow_threads(|| self.inner.read_exact(&mut buf))?;
Ok(PyBytes::new(py, &buf).into())
}
fn write_all(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<()> {
py.allow_threads(|| self.inner.write_all(&data))?;
Ok(())
}
}
#[pyclass(name = "SharedMemory")]
pub struct PySharedMemory {
inner: RustSharedMemory,
}
#[pymethods]
impl PySharedMemory {
#[staticmethod]
fn create(name: &str, size: usize) -> PyResult<Self> {
let inner = RustSharedMemory::create(name, size)?;
Ok(Self { inner })
}
#[staticmethod]
fn open(name: &str) -> PyResult<Self> {
let inner = RustSharedMemory::open(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn size(&self) -> usize {
self.inner.size()
}
#[getter]
fn is_owner(&self) -> bool {
self.inner.is_owner()
}
fn write(&mut self, offset: usize, data: &[u8]) -> PyResult<()> {
self.inner.write(offset, data)?;
Ok(())
}
fn read(&self, py: Python<'_>, offset: usize, size: usize) -> PyResult<Py<PyBytes>> {
let data = self.inner.read(offset, size)?;
Ok(PyBytes::new(py, &data).into())
}
fn read_all(&self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
let data = self.inner.read(0, self.inner.size())?;
Ok(PyBytes::new(py, &data).into())
}
}
#[pyclass(name = "IpcChannel")]
pub struct PyIpcChannel {
inner: crate::channel::IpcChannel<Vec<u8>>,
}
#[pymethods]
impl PyIpcChannel {
#[staticmethod]
fn create(name: &str) -> PyResult<Self> {
let inner = crate::channel::IpcChannel::create(name)?;
Ok(Self { inner })
}
#[staticmethod]
fn connect(name: &str) -> PyResult<Self> {
let inner = crate::channel::IpcChannel::connect(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn is_server(&self) -> bool {
self.inner.is_server()
}
fn wait_for_client(&mut self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| self.inner.wait_for_client())?;
Ok(())
}
fn send(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<()> {
py.allow_threads(|| self.inner.send_bytes(&data))?;
Ok(())
}
fn recv(&mut self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
let data = py.allow_threads(|| self.inner.recv_bytes())?;
Ok(PyBytes::new(py, &data).into())
}
fn send_json(&mut self, py: Python<'_>, obj: &Bound<'_, PyAny>) -> PyResult<()> {
let value = py_to_json_value(obj)?;
let json_bytes = serde_json::to_vec(&value)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
py.allow_threads(|| self.inner.send_bytes(&json_bytes))?;
Ok(())
}
fn recv_json(&mut self, py: Python<'_>) -> PyResult<PyObject> {
let data = py.allow_threads(|| self.inner.recv_bytes())?;
let value: serde_json::Value =
serde_json::from_slice(&data).map_err(|e| IpcError::deserialization(e.to_string()))?;
json_value_to_py(py, &value)
}
}
#[pyclass(name = "FileChannel")]
pub struct PyFileChannel {
inner: RustFileChannel,
}
#[pymethods]
impl PyFileChannel {
#[staticmethod]
fn backend(dir: &str) -> PyResult<Self> {
let inner = RustFileChannel::backend(dir)?;
Ok(Self { inner })
}
#[staticmethod]
fn frontend(dir: &str) -> PyResult<Self> {
let inner = RustFileChannel::frontend(dir)?;
Ok(Self { inner })
}
#[getter]
fn dir(&self) -> String {
self.inner.dir().to_string_lossy().to_string()
}
fn send_request(&self, method: &str, params: &Bound<'_, PyAny>) -> PyResult<String> {
let json_value = py_to_json_value(params)?;
let id = self.inner.send_request(method, json_value)?;
Ok(id)
}
fn send_response(&self, request_id: &str, result: &Bound<'_, PyAny>) -> PyResult<()> {
let json_value = py_to_json_value(result)?;
self.inner.send_response(request_id, json_value)?;
Ok(())
}
fn send_error(&self, request_id: &str, error: &str) -> PyResult<()> {
self.inner.send_error(request_id, error)?;
Ok(())
}
fn send_event(&self, name: &str, payload: &Bound<'_, PyAny>) -> PyResult<()> {
let json_value = py_to_json_value(payload)?;
self.inner.send_event(name, json_value)?;
Ok(())
}
fn recv(&mut self, py: Python<'_>) -> PyResult<PyObject> {
let messages = self.inner.recv()?;
let list = PyList::empty(py);
for msg in messages {
list.append(file_message_to_py(py, msg)?)?;
}
Ok(list.into())
}
fn recv_one(&mut self, py: Python<'_>) -> PyResult<PyObject> {
match self.inner.recv_one()? {
Some(msg) => file_message_to_py(py, msg),
None => Ok(py.None()),
}
}
fn wait_response(
&mut self,
py: Python<'_>,
request_id: &str,
timeout_ms: u64,
) -> PyResult<PyObject> {
let timeout = Duration::from_millis(timeout_ms);
let msg = self.inner.wait_response(request_id, timeout)?;
file_message_to_py(py, msg)
}
fn clear(&self) -> PyResult<()> {
self.inner.clear()?;
Ok(())
}
}
fn file_message_to_py(py: Python<'_>, msg: RustFileMessage) -> PyResult<PyObject> {
let dict = PyDict::new(py);
dict.set_item("id", &msg.id)?;
dict.set_item("timestamp", msg.timestamp)?;
let msg_type = match msg.msg_type {
RustMessageType::Request => "request",
RustMessageType::Response => "response",
RustMessageType::Event => "event",
};
dict.set_item("type", msg_type)?;
if let Some(method) = msg.method {
dict.set_item("method", method)?;
}
if let Some(reply_to) = msg.reply_to {
dict.set_item("reply_to", reply_to)?;
}
if let Some(error) = msg.error {
dict.set_item("error", error)?;
}
let payload = json_value_to_py(py, &msg.payload)?;
dict.set_item("payload", payload)?;
Ok(dict.into())
}
#[pyclass(name = "GracefulNamedPipe")]
pub struct PyGracefulNamedPipe {
inner: RustGracefulNamedPipe,
}
#[pymethods]
impl PyGracefulNamedPipe {
#[staticmethod]
fn create(name: &str) -> PyResult<Self> {
let inner = RustGracefulNamedPipe::create(name)?;
Ok(Self { inner })
}
#[staticmethod]
fn connect(name: &str) -> PyResult<Self> {
let inner = RustGracefulNamedPipe::connect(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn is_server(&self) -> bool {
self.inner.is_server()
}
#[getter]
fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
fn wait_for_client(&mut self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| self.inner.wait_for_client())?;
Ok(())
}
fn shutdown(&self) {
self.inner.shutdown();
}
fn drain(&self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| self.inner.drain())?;
Ok(())
}
fn shutdown_timeout(&self, py: Python<'_>, timeout_ms: u64) -> PyResult<()> {
let timeout = Duration::from_millis(timeout_ms);
py.allow_threads(|| self.inner.shutdown_timeout(timeout))?;
Ok(())
}
fn read(&mut self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut buf = vec![0u8; size];
let n = py.allow_threads(|| self.inner.read(&mut buf))?;
buf.truncate(n);
Ok(PyBytes::new(py, &buf).into())
}
fn write(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<usize> {
let n = py.allow_threads(|| self.inner.write(&data))?;
Ok(n)
}
fn read_exact(&mut self, py: Python<'_>, size: usize) -> PyResult<Py<PyBytes>> {
let mut buf = vec![0u8; size];
py.allow_threads(|| self.inner.read_exact(&mut buf))?;
Ok(PyBytes::new(py, &buf).into())
}
fn write_all(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<()> {
py.allow_threads(|| self.inner.write_all(&data))?;
Ok(())
}
}
#[pyclass(name = "GracefulIpcChannel")]
pub struct PyGracefulIpcChannel {
inner: RustGracefulIpcChannel<Vec<u8>>,
}
#[pymethods]
impl PyGracefulIpcChannel {
#[staticmethod]
fn create(name: &str) -> PyResult<Self> {
let inner = RustGracefulIpcChannel::create(name)?;
Ok(Self { inner })
}
#[staticmethod]
fn connect(name: &str) -> PyResult<Self> {
let inner = RustGracefulIpcChannel::connect(name)?;
Ok(Self { inner })
}
#[getter]
fn name(&self) -> &str {
self.inner.name()
}
#[getter]
fn is_server(&self) -> bool {
self.inner.is_server()
}
#[getter]
fn is_shutdown(&self) -> bool {
self.inner.is_shutdown()
}
fn wait_for_client(&mut self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| self.inner.wait_for_client())?;
Ok(())
}
fn shutdown(&self) {
self.inner.shutdown();
}
fn drain(&self, py: Python<'_>) -> PyResult<()> {
py.allow_threads(|| self.inner.drain())?;
Ok(())
}
fn shutdown_timeout(&self, py: Python<'_>, timeout_ms: u64) -> PyResult<()> {
let timeout = Duration::from_millis(timeout_ms);
py.allow_threads(|| self.inner.shutdown_timeout(timeout))?;
Ok(())
}
fn send(&mut self, py: Python<'_>, data: Vec<u8>) -> PyResult<()> {
py.allow_threads(|| self.inner.send_bytes(&data))?;
Ok(())
}
fn recv(&mut self, py: Python<'_>) -> PyResult<Py<PyBytes>> {
let data = py.allow_threads(|| self.inner.recv_bytes())?;
Ok(PyBytes::new(py, &data).into())
}
fn send_json(&mut self, py: Python<'_>, obj: &Bound<'_, PyAny>) -> PyResult<()> {
let value = py_to_json_value(obj)?;
let json_bytes = serde_json::to_vec(&value)
.map_err(|e| PyErr::new::<pyo3::exceptions::PyValueError, _>(e.to_string()))?;
py.allow_threads(|| self.inner.send_bytes(&json_bytes))?;
Ok(())
}
fn recv_json(&mut self, py: Python<'_>) -> PyResult<PyObject> {
let data = py.allow_threads(|| self.inner.recv_bytes())?;
let value: serde_json::Value =
serde_json::from_slice(&data).map_err(|e| IpcError::deserialization(e.to_string()))?;
json_value_to_py(py, &value)
}
}
#[pymodule]
#[pyo3(name = "ipckit")]
pub fn ipckit_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PyAnonymousPipe>()?;
m.add_class::<PyNamedPipe>()?;
m.add_class::<PySharedMemory>()?;
m.add_class::<PyIpcChannel>()?;
m.add_class::<PyFileChannel>()?;
m.add_class::<PyGracefulNamedPipe>()?;
m.add_class::<PyGracefulIpcChannel>()?;
m.add_function(wrap_pyfunction!(json_dumps, m)?)?;
m.add_function(wrap_pyfunction!(json_dumps_pretty, m)?)?;
m.add_function(wrap_pyfunction!(json_loads, m)?)?;
m.add("__version__", env!("CARGO_PKG_VERSION"))?;
m.add(
"__doc__",
"ipckit - A cross-platform IPC library for Python
This library provides various IPC mechanisms:
- AnonymousPipe: For parent-child process communication
- NamedPipe: For communication between unrelated processes
- SharedMemory: For fast data sharing between processes
- IpcChannel: High-level message passing interface
- FileChannel: File-based IPC for frontend-backend communication
Graceful shutdown support:
- GracefulNamedPipe: Named pipe with graceful shutdown
- GracefulIpcChannel: IPC channel with graceful shutdown
JSON utilities (faster than Python's json module):
- json_dumps(obj): Serialize Python object to JSON string
- json_dumps_pretty(obj): Serialize with pretty formatting
- json_loads(s): Deserialize JSON string to Python object
Example:
import ipckit
# Using graceful shutdown
channel = ipckit.GracefulIpcChannel.create('my_channel')
channel.wait_for_client()
# ... use channel ...
# Graceful shutdown
channel.shutdown()
channel.drain() # Wait for pending operations
# Or with timeout (in milliseconds)
channel.shutdown_timeout(5000)
",
)?;
Ok(())
}