use std::{
collections::HashMap,
sync::{Mutex, OnceLock},
};
use anyhow::{Result, anyhow, bail};
use crate::{
Graph,
traversal::{
Kernel, RunOptions, SearchResult,
typed::{self, OwnedSearchResult, ParquetPaths, TypedKernel, TypedPayloadCache},
},
};
pub use inventory;
pub trait RunKernel: Send + Sync {
fn run<'g>(&self, graph: &'g Graph, run: RunOptions) -> Result<SearchResult<'g>>;
}
impl<F> RunKernel for F
where
F: for<'g> Fn(&'g Graph, RunOptions) -> Result<SearchResult<'g>> + Send + Sync,
{
fn run<'g>(&self, graph: &'g Graph, run: RunOptions) -> Result<SearchResult<'g>> {
self(graph, run)
}
}
pub type BoxedRun = Box<dyn RunKernel>;
pub type MakeFn = fn(&serde_json::Value) -> Result<BoxedRun>;
pub struct KernelEntry {
pub name: &'static str,
pub make: MakeFn,
}
inventory::collect!(KernelEntry);
pub trait RunTypedKernel: Send + Sync {
fn run_eager(&self, graph: &Graph, run: RunOptions) -> Result<OwnedSearchResult>;
fn run_eager_cached(
&self,
graph: &Graph,
cache: &TypedPayloadCache,
run: RunOptions,
) -> Result<OwnedSearchResult>;
fn run_parquet_lazy(
&self,
graph: &Graph,
paths: ParquetPaths,
run: RunOptions,
) -> Result<OwnedSearchResult>;
}
pub type BoxedTypedRun = Box<dyn RunTypedKernel>;
pub type MakeTypedFn = fn(&serde_json::Value) -> Result<BoxedTypedRun>;
pub struct TypedKernelEntry {
pub name: &'static str,
pub make: MakeTypedFn,
}
inventory::collect!(TypedKernelEntry);
pub fn boxed_run<K>(kernel: K) -> BoxedRun
where
K: Kernel + Clone + Send + Sync + 'static,
K::State: Send + Sync + Clone,
{
struct Runner<K>(K);
impl<K> RunKernel for Runner<K>
where
K: Kernel + Clone + Send + Sync + 'static,
K::State: Send + Sync + Clone,
{
fn run<'g>(&self, graph: &'g Graph, run: RunOptions) -> Result<SearchResult<'g>> {
graph.search_with(self.0.clone(), run)
}
}
Box::new(Runner(kernel))
}
pub fn boxed_typed_run<K>(kernel: K) -> BoxedTypedRun
where
K: TypedKernel + Clone + Send + Sync + 'static,
K::Node: Send + Sync + 'static,
K::Edge: Send + Sync + 'static,
K::State: Send + Sync + Clone,
{
struct Runner<K>(K);
impl<K> RunTypedKernel for Runner<K>
where
K: TypedKernel + Clone + Send + Sync + 'static,
K::Node: Send + Sync + 'static,
K::Edge: Send + Sync + 'static,
K::State: Send + Sync + Clone,
{
fn run_eager(&self, graph: &Graph, run: RunOptions) -> Result<OwnedSearchResult> {
typed::run_typed_eager(graph, self.0.clone(), run)
}
fn run_eager_cached(
&self,
graph: &Graph,
cache: &TypedPayloadCache,
run: RunOptions,
) -> Result<OwnedSearchResult> {
typed::run_typed_eager_cached(graph, cache, self.0.clone(), run)
}
fn run_parquet_lazy(
&self,
graph: &Graph,
paths: ParquetPaths,
run: RunOptions,
) -> Result<OwnedSearchResult> {
typed::run_typed_parquet_lazy(graph, paths, self.0.clone(), run)
}
}
Box::new(Runner(kernel))
}
type RuntimeMap = HashMap<String, MakeFn>;
fn runtime_registry() -> &'static Mutex<RuntimeMap> {
static REGISTRY: OnceLock<Mutex<RuntimeMap>> = OnceLock::new();
REGISTRY.get_or_init(|| Mutex::new(HashMap::new()))
}
pub fn register_kernel(name: impl Into<String>, make: MakeFn) -> Result<()> {
let name = name.into();
let mut registry = runtime_registry()
.lock()
.map_err(|_| anyhow!("kernel registry poisoned"))?;
if registry.contains_key(&name) {
bail!("kernel {name:?} is already registered");
}
registry.insert(name, make);
Ok(())
}
pub fn build_kernel(name: &str, params: &serde_json::Value) -> Result<BoxedRun> {
if let Some(kernel) = try_build_kernel(name, params)? {
return Ok(kernel);
}
bail!(
"unknown kernel {name:?}; available kernels: {}",
available_names().join(", ")
)
}
pub fn try_build_kernel(name: &str, params: &serde_json::Value) -> Result<Option<BoxedRun>> {
let runtime = {
let registry = runtime_registry()
.lock()
.map_err(|_| anyhow!("kernel registry poisoned"))?;
registry.get(name).copied()
};
if let Some(make) = runtime {
return make(params).map(Some);
}
for entry in inventory::iter::<KernelEntry> {
if entry.name == name {
return (entry.make)(params).map(Some);
}
}
Ok(None)
}
pub fn build_typed_kernel(name: &str, params: &serde_json::Value) -> Result<BoxedTypedRun> {
if let Some(kernel) = try_build_typed_kernel(name, params)? {
return Ok(kernel);
}
anyhow::bail!(
"unknown typed kernel {name:?}; available typed kernels: {}",
available_typed_names().join(", ")
)
}
pub fn try_build_typed_kernel(
name: &str,
params: &serde_json::Value,
) -> Result<Option<BoxedTypedRun>> {
for entry in inventory::iter::<TypedKernelEntry> {
if entry.name == name {
return (entry.make)(params).map(Some);
}
}
Ok(None)
}
fn available_names() -> Vec<String> {
let mut names = Vec::new();
if let Ok(registry) = runtime_registry().lock() {
names.extend(registry.keys().cloned());
}
names.extend(
inventory::iter::<KernelEntry>
.into_iter()
.map(|e| e.name.to_string()),
);
names.sort();
names.dedup();
names
}
fn available_typed_names() -> Vec<String> {
let mut names = inventory::iter::<TypedKernelEntry>
.into_iter()
.map(|e| e.name.to_string())
.collect::<Vec<_>>();
names.sort();
names.dedup();
names
}