use fidius_core::Value;
use fidius_python::PythonPluginHandle;
use crate::error::CallError;
use crate::executor::{PluginExecutor, ValueExecutor};
use crate::types::PluginInfo;
pub struct Pyo3Executor {
py: PythonPluginHandle,
info: PluginInfo,
}
impl Pyo3Executor {
pub fn new(py: PythonPluginHandle, info: PluginInfo) -> Self {
Self { py, info }
}
}
impl PluginExecutor for Pyo3Executor {
fn info(&self) -> &PluginInfo {
&self.info
}
fn method_count(&self) -> u32 {
self.py.method_count() as u32
}
fn call_raw(&self, method: usize, input: &[u8]) -> Result<Vec<u8>, CallError> {
self.py.call_raw(method, input).map_err(CallError::from)
}
}
impl ValueExecutor for Pyo3Executor {
fn call(&self, method: usize, args: Value) -> Result<Value, CallError> {
let json =
serde_json::to_vec(&args).map_err(|e| CallError::Serialization(e.to_string()))?;
let out = self
.py
.call_typed_json(method, &json)
.map_err(CallError::from)?;
serde_json::from_slice(&out).map_err(|e| CallError::Deserialization(e.to_string()))
}
}
#[cfg(feature = "streaming")]
const STREAM_CHANNEL_CAP: usize = 4;
#[cfg(feature = "streaming")]
#[async_trait::async_trait]
impl crate::stream::StreamExecutor for Pyo3Executor {
async fn call_streaming(
&self,
method: usize,
args: Value,
) -> Result<crate::stream::ChunkStream, CallError> {
let json =
serde_json::to_vec(&args).map_err(|e| CallError::Serialization(e.to_string()))?;
let stream = self
.py
.call_streaming_start(method, &json)
.map_err(CallError::from)?;
let (tx, rx) = tokio::sync::mpsc::channel::<Result<Value, CallError>>(STREAM_CHANNEL_CAP);
std::thread::spawn(move || {
use fidius_python::PyStreamStep;
loop {
match stream.next() {
PyStreamStep::End => break,
PyStreamStep::Error(pe) => {
let _ = tx.blocking_send(Err(CallError::Plugin(pe)));
break;
}
PyStreamStep::Item(jv) => {
let item = match serde_json::from_value::<Value>(jv) {
Ok(v) => Ok(v),
Err(e) => Err(CallError::Deserialization(e.to_string())),
};
let is_err = item.is_err();
if tx.blocking_send(item).is_err() {
stream.cancel();
break;
}
if is_err {
break;
}
}
}
}
});
let body = futures::stream::unfold(rx, |mut rx| async move {
rx.recv().await.map(|item| (item, rx))
});
Ok(crate::stream::ChunkStream::new(body))
}
}