#![cfg(all(feature = "python", feature = "streaming"))]
use std::path::PathBuf;
use fidius_core::python_descriptor::PythonInterfaceDescriptor;
use fidius_host::{PluginHost, PluginRuntimeKind};
use futures::StreamExt;
fn ticker_descriptor() -> &'static PythonInterfaceDescriptor {
&test_plugin_smoke::__fidius_Ticker::Ticker_PYTHON_DESCRIPTOR
}
fn stage(tmp: &tempfile::TempDir) -> PathBuf {
let plugins_root = tmp.path().to_path_buf();
let dest = plugins_root.join("py-ticker");
copy_dir(&repo_root().join("tests/test-plugin-py-ticker"), &dest);
copy_dir(
&repo_root().join("python/fidius"),
&dest.join("vendor").join("fidius"),
);
let py = dest.join("ticker.py");
let src = std::fs::read_to_string(&py).unwrap();
let injected = src.replace(
"__HASH_PLACEHOLDER__",
&format!("0x{:016X}", ticker_descriptor().interface_hash),
);
std::fs::write(&py, injected).unwrap();
plugins_root
}
fn repo_root() -> PathBuf {
PathBuf::from(env!("CARGO_MANIFEST_DIR"))
.parent()
.unwrap()
.parent()
.unwrap()
.to_path_buf()
}
fn copy_dir(src: &std::path::Path, dst: &std::path::Path) {
std::fs::create_dir_all(dst).unwrap();
for entry in std::fs::read_dir(src).unwrap() {
let entry = entry.unwrap();
let from = entry.path();
let to = dst.join(entry.file_name());
if from.is_dir() {
copy_dir(&from, &to);
} else {
std::fs::copy(&from, &to).unwrap();
}
}
}
fn tick_index() -> usize {
test_plugin_smoke::__fidius_Ticker::METHOD_TICK
}
#[test]
fn discover_lists_streaming_python_plugin() {
let tmp = tempfile::TempDir::new().unwrap();
let plugins = stage(&tmp);
let host = PluginHost::builder().search_path(&plugins).build().unwrap();
let infos = host.discover().unwrap();
let info = infos
.iter()
.find(|i| i.name == "py-ticker")
.expect("py-ticker in discovery");
assert!(matches!(info.runtime, PluginRuntimeKind::Python));
assert_eq!(info.interface_name, "Ticker");
}
#[tokio::test]
async fn server_stream_yields_all_items() {
let tmp = tempfile::TempDir::new().unwrap();
let plugins = stage(&tmp);
let host = PluginHost::builder().search_path(&plugins).build().unwrap();
let handle = host
.load_python("py-ticker", ticker_descriptor())
.expect("load_python");
let mut stream = handle
.call_streaming::<_, u64>(tick_index(), &(5u32,))
.await
.expect("call_streaming");
let mut got = Vec::new();
while let Some(item) = stream.next().await {
let v: u64 = fidius_core::from_value(item.expect("item ok")).expect("u64");
got.push(v);
}
assert_eq!(got, vec![0, 1, 2, 3, 4]);
}
#[tokio::test]
async fn huge_stream_is_bounded_and_cancellable() {
let tmp = tempfile::TempDir::new().unwrap();
let plugins = stage(&tmp);
let host = PluginHost::builder().search_path(&plugins).build().unwrap();
let handle = host
.load_python("py-ticker", ticker_descriptor())
.expect("load_python");
let mut stream = handle
.call_streaming::<_, u64>(tick_index(), &(10_000_000u32,))
.await
.expect("call_streaming");
let mut got = Vec::new();
for _ in 0..3 {
let v: u64 =
fidius_core::from_value(stream.next().await.expect("item").expect("ok")).unwrap();
got.push(v);
}
assert_eq!(got, vec![0, 1, 2]);
drop(stream);
}
#[tokio::test]
async fn composition_pump_into_sink() {
let tmp = tempfile::TempDir::new().unwrap();
let plugins = stage(&tmp);
let host = PluginHost::builder().search_path(&plugins).build().unwrap();
let handle = host
.load_python("py-ticker", ticker_descriptor())
.expect("load_python");
let stream = handle
.call_streaming::<_, u64>(tick_index(), &(4u32,))
.await
.expect("call_streaming");
let sink = fidius_test::CollectSink::new();
fidius_test::pump(stream, &sink).await.expect("pump");
let got: Vec<u64> = sink
.take()
.into_iter()
.map(|v| fidius_core::from_value(v).unwrap())
.collect();
assert_eq!(got, vec![0, 1, 2, 3]);
}