use super::{
context::PlexusContext,
method_enum::MethodEnumSchema,
schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
streaming::PlexusStream,
};
use crate::types::Handle;
use async_stream::stream;
use async_trait::async_trait;
use bitflags::bitflags;
use futures::Stream;
use futures_core::stream::BoxStream;
use jsonrpsee::core::server::Methods;
use jsonrpsee::RpcModule;
pub const PLEXUS_NOTIF_METHOD: &str = "result";
use schemars::JsonSchema;
use serde::{Deserialize, Serialize};
use serde_json::Value;
use std::collections::HashMap;
use std::sync::Arc;
#[derive(Debug, Clone)]
pub enum PlexusError {
ActivationNotFound(String),
MethodNotFound { activation: String, method: String },
InvalidParams(String),
ExecutionError(String),
HandleNotSupported(String),
TransportError(TransportErrorKind),
Unauthenticated(String),
}
#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(tag = "error_kind", rename_all = "snake_case")]
pub enum TransportErrorKind {
ConnectionRefused { host: String, port: u16 },
ConnectionTimeout { host: String, port: u16 },
ProtocolError { message: String },
NetworkError { message: String },
}
impl std::fmt::Display for TransportErrorKind {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
TransportErrorKind::ConnectionRefused { host, port } => {
write!(f, "Connection refused to {}:{}", host, port)
}
TransportErrorKind::ConnectionTimeout { host, port } => {
write!(f, "Connection timeout to {}:{}", host, port)
}
TransportErrorKind::ProtocolError { message } => {
write!(f, "Protocol error: {}", message)
}
TransportErrorKind::NetworkError { message } => {
write!(f, "Network error: {}", message)
}
}
}
}
impl std::fmt::Display for PlexusError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
PlexusError::MethodNotFound { activation, method } => {
write!(f, "Method not found: {}.{}", activation, method)
}
PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
PlexusError::HandleNotSupported(activation) => {
write!(f, "Handle resolution not supported by activation: {}", activation)
}
PlexusError::TransportError(kind) => match kind {
TransportErrorKind::ConnectionRefused { host, port } => {
write!(f, "Connection refused to {}:{}", host, port)
}
TransportErrorKind::ConnectionTimeout { host, port } => {
write!(f, "Connection timeout to {}:{}", host, port)
}
TransportErrorKind::ProtocolError { message } => {
write!(f, "Protocol error: {}", message)
}
TransportErrorKind::NetworkError { message } => {
write!(f, "Network error: {}", message)
}
}
PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
}
}
}
impl std::error::Error for PlexusError {}
fn plexus_error_code(e: &PlexusError) -> i32 {
match e {
PlexusError::Unauthenticated(_) => -32001,
PlexusError::InvalidParams(_) => -32602,
PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
_ => -32000,
}
}
fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ActivationInfo {
pub namespace: String,
pub version: String,
pub description: String,
pub methods: Vec<String>,
}
#[async_trait]
pub trait Activation: Send + Sync + 'static {
type Methods: MethodEnumSchema;
fn namespace(&self) -> &str;
fn version(&self) -> &str;
fn description(&self) -> &str { "No description available" }
fn long_description(&self) -> Option<&str> { None }
fn methods(&self) -> Vec<&str>;
fn method_help(&self, _method: &str) -> Option<String> { None }
fn plugin_id(&self) -> uuid::Uuid {
let major_version = self.version().split('.').next().unwrap_or("0");
uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
}
async fn call(
&self,
method: &str,
params: Value,
auth: Option<&super::auth::AuthContext>,
raw_ctx: Option<&crate::request::RawRequestContext>,
) -> Result<PlexusStream, PlexusError>;
async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
}
fn into_rpc_methods(self) -> Methods where Self: Sized;
fn plugin_schema(&self) -> PluginSchema {
use std::collections::hash_map::DefaultHasher;
use std::hash::{Hash, Hasher};
let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
let desc = self.method_help(name).unwrap_or_default();
let mut hasher = DefaultHasher::new();
name.hash(&mut hasher);
desc.hash(&mut hasher);
let hash = format!("{:016x}", hasher.finish());
MethodSchema::new(name.to_string(), desc, hash)
}).collect();
if let Some(long_desc) = self.long_description() {
PluginSchema::leaf_with_long_description(
self.namespace(),
self.version(),
self.description(),
long_desc,
methods,
)
} else {
PluginSchema::leaf(
self.namespace(),
self.version(),
self.description(),
methods,
)
}
}
}
bitflags! {
#[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
#[deprecated(
since = "0.5",
note = "Use MethodRole::DynamicChild { list_method, search_method } instead. Removed in 0.7."
)]
pub struct ChildCapabilities: u32 {
const LIST = 0b0000_0001;
const SEARCH = 0b0000_0010;
}
}
#[async_trait]
pub trait ChildRouter: Send + Sync {
fn router_namespace(&self) -> &str;
async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
#[allow(deprecated)]
fn capabilities(&self) -> ChildCapabilities {
ChildCapabilities::empty()
}
async fn list_children(&self) -> Option<BoxStream<'_, String>> {
None
}
async fn search_children(&self, _query: &str) -> Option<BoxStream<'_, String>> {
None
}
fn forward_policy_for(
&self,
_callee_ns: &str,
) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
None
}
fn framework_stamped_principal(&self) -> plexus_auth_core::Principal {
plexus_auth_core::Principal::Anonymous
}
}
pub async fn route_to_child<T: ChildRouter + ?Sized>(
parent: &T,
method: &str,
params: Value,
auth: Option<&super::auth::AuthContext>,
raw_ctx: Option<&crate::request::RawRequestContext>,
) -> Result<PlexusStream, PlexusError> {
if let Some((child_name, rest)) = method.split_once('.') {
if let Some(child) = parent.get_child(child_name).await {
let policy: std::sync::Arc<dyn plexus_auth_core::ForwardPolicy> = parent
.forward_policy_for(child_name)
.unwrap_or_else(|| {
std::sync::Arc::new(plexus_auth_core::IdentityOnly)
as std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>
});
let callee_method_str = format!("{}.{}", child_name, rest);
let callee_method = plexus_auth_core::MethodPath::try_new(callee_method_str.as_str())
.map_err(|e| PlexusError::ExecutionError(format!(
"framework-built MethodPath rejected: {} ({:?})",
callee_method_str, e
)))?;
let site = plexus_auth_core::CallSite::new(
parent.framework_stamped_principal(),
callee_method,
);
let anonymous_owned;
let caller_ctx: &super::auth::AuthContext = match auth {
Some(ctx) => ctx,
None => {
anonymous_owned = super::auth::AuthContext::anonymous();
&anonymous_owned
}
};
let derivation = policy.forward(caller_ctx, &site);
tracing::trace!(
target: "plexus::audit",
policy = policy.name().as_str(),
callee_method = %site.callee_method.as_str(),
derivation_keep_verified_user = derivation.keep_verified_user,
derivation_keep_roles = derivation.keep_roles,
derivation_keep_capabilities = derivation.keep_capabilities,
derivation_keep_metadata = derivation.keep_metadata,
"forward_policy_applied (audit-record emission stubbed pending PRIVACY-1)"
);
return match auth {
Some(caller_ctx) => {
caller_ctx
.with_callee_context(&derivation, &site.caller, |callee_ctx| async move {
child
.router_call(rest, params, Some(&callee_ctx), raw_ctx)
.await
})
.await
}
None => child.router_call(rest, params, None, raw_ctx).await,
};
}
return Err(PlexusError::ActivationNotFound(child_name.to_string()));
}
Err(PlexusError::MethodNotFound {
activation: parent.router_namespace().to_string(),
method: method.to_string(),
})
}
struct ArcChildRouter(Arc<dyn ChildRouter>);
#[async_trait]
impl ChildRouter for ArcChildRouter {
fn router_namespace(&self) -> &str {
self.0.router_namespace()
}
async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
self.0.router_call(method, params, auth, raw_ctx).await
}
async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
self.0.get_child(name).await
}
#[allow(deprecated)]
fn capabilities(&self) -> ChildCapabilities {
self.0.capabilities()
}
async fn list_children(&self) -> Option<BoxStream<'_, String>> {
self.0.list_children().await
}
async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
self.0.search_children(query).await
}
fn forward_policy_for(
&self,
callee_ns: &str,
) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
self.0.forward_policy_for(callee_ns)
}
fn framework_stamped_principal(&self) -> plexus_auth_core::Principal {
self.0.framework_stamped_principal()
}
}
#[async_trait]
#[allow(dead_code)] trait ActivationObject: Send + Sync + 'static {
fn namespace(&self) -> &str;
fn version(&self) -> &str;
fn description(&self) -> &str;
fn long_description(&self) -> Option<&str>;
fn methods(&self) -> Vec<&str>;
fn method_help(&self, method: &str) -> Option<String>;
fn plugin_id(&self) -> uuid::Uuid;
async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
fn plugin_schema(&self) -> PluginSchema;
fn schema(&self) -> Schema;
}
struct ActivationWrapper<A: Activation> {
inner: A,
}
#[async_trait]
impl<A: Activation> ActivationObject for ActivationWrapper<A> {
fn namespace(&self) -> &str { self.inner.namespace() }
fn version(&self) -> &str { self.inner.version() }
fn description(&self) -> &str { self.inner.description() }
fn long_description(&self) -> Option<&str> { self.inner.long_description() }
fn methods(&self) -> Vec<&str> { self.inner.methods() }
fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
self.inner.call(method, params, auth, raw_ctx).await
}
async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
self.inner.resolve_handle(handle).await
}
fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
fn schema(&self) -> Schema {
let schema = schemars::schema_for!(A::Methods);
serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
.expect("parse schema")
}
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum HashEvent {
Hash { value: String },
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
#[serde(tag = "event", rename_all = "snake_case")]
pub enum SchemaEvent {
Schema(PluginSchema),
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct PluginHashes {
pub namespace: String,
pub self_hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub children_hash: Option<String>,
pub hash: String,
#[serde(skip_serializing_if = "Option::is_none")]
pub children: Option<Vec<ChildHashes>>,
}
#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
pub struct ChildHashes {
pub namespace: String,
pub hash: String,
}
#[derive(Debug, Clone)]
pub struct PluginEntry {
pub id: uuid::Uuid,
pub path: String,
pub plugin_type: String,
}
#[derive(Default)]
pub struct PluginRegistry {
by_id: HashMap<uuid::Uuid, PluginEntry>,
by_path: HashMap<String, uuid::Uuid>,
}
#[derive(Clone)]
pub struct PluginRegistrySnapshot {
by_id: HashMap<uuid::Uuid, PluginEntry>,
by_path: HashMap<String, uuid::Uuid>,
}
impl PluginRegistrySnapshot {
pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
self.by_id.get(&id).map(|e| e.path.as_str())
}
pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
self.by_path.get(path).copied()
}
pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
self.by_id.get(&id)
}
pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
self.by_id.values()
}
pub fn len(&self) -> usize {
self.by_id.len()
}
pub fn is_empty(&self) -> bool {
self.by_id.is_empty()
}
}
impl PluginRegistry {
pub fn new() -> Self {
Self::default()
}
pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
self.by_id.get(&id).map(|e| e.path.as_str())
}
pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
self.by_path.get(path).copied()
}
pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
self.by_id.get(&id)
}
pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
let entry = PluginEntry { id, path: path.clone(), plugin_type };
self.by_id.insert(id, entry);
self.by_path.insert(path, id);
}
pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
self.by_id.values()
}
pub fn len(&self) -> usize {
self.by_id.len()
}
pub fn is_empty(&self) -> bool {
self.by_id.is_empty()
}
}
fn build_info_payload(
namespace: &str,
caps: Option<&plexus_auth_core::BackendAuthCapabilities>,
) -> serde_json::Value {
let advertised = match caps {
Some(c) => c.clone(),
None => plexus_auth_core::BackendAuthCapabilities::anonymous_default(),
};
serde_json::json!({
"backend": namespace,
"auth_capabilities": advertised,
})
}
struct DynamicHubInner {
namespace: String,
activations: HashMap<String, Arc<dyn ActivationObject>>,
child_routers: HashMap<String, Arc<dyn ChildRouter>>,
registry: std::sync::RwLock<PluginRegistry>,
pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
auth_capabilities: Option<plexus_auth_core::BackendAuthCapabilities>,
forward_policies: super::forward_registry::ForwardPolicyRegistry,
}
#[derive(Clone)]
pub struct DynamicHub {
inner: Arc<DynamicHubInner>,
}
impl DynamicHub {
pub fn new(namespace: impl Into<String>) -> Self {
Self {
inner: Arc::new(DynamicHubInner {
namespace: namespace.into(),
activations: HashMap::new(),
child_routers: HashMap::new(),
registry: std::sync::RwLock::new(PluginRegistry::new()),
pending_rpc: std::sync::Mutex::new(Vec::new()),
auth_capabilities: None,
forward_policies: super::forward_registry::ForwardPolicyRegistry::new(),
}),
}
}
pub fn with_forward_policy(
mut self,
callee_ns: impl Into<String>,
policy: std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>,
) -> Self {
let inner = Arc::get_mut(&mut self.inner)
.expect("Cannot register forward policy: DynamicHub has multiple references");
inner.forward_policies.register(callee_ns, policy);
self
}
pub fn forward_policies(&self) -> &super::forward_registry::ForwardPolicyRegistry {
&self.inner.forward_policies
}
pub fn with_auth_capabilities(
mut self,
caps: plexus_auth_core::BackendAuthCapabilities,
) -> Self {
let inner = Arc::get_mut(&mut self.inner)
.expect("Cannot set auth_capabilities: DynamicHub has multiple references");
inner.auth_capabilities = Some(caps);
self
}
pub fn auth_capabilities(&self) -> Option<&plexus_auth_core::BackendAuthCapabilities> {
self.inner.auth_capabilities.as_ref()
}
#[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
pub fn with_namespace(namespace: impl Into<String>) -> Self {
Self::new(namespace)
}
pub fn runtime_namespace(&self) -> &str {
&self.inner.namespace
}
pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
self.inner.registry.read().unwrap()
}
pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
let namespace = activation.namespace().to_string();
let plugin_id = activation.plugin_id();
let activation_for_rpc = activation.clone();
let activation_for_router = activation.clone();
let inner = Arc::get_mut(&mut self.inner)
.expect("Cannot register: DynamicHub has multiple references");
inner.registry.write().unwrap().register(
plugin_id,
namespace.clone(),
namespace.clone(), );
inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
inner.pending_rpc.lock().unwrap()
.push(Box::new(move || activation_for_rpc.into_rpc_methods()));
self
}
#[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
let namespace = activation.namespace().to_string();
let plugin_id = activation.plugin_id();
let activation_for_rpc = activation.clone();
let activation_for_router = activation.clone();
let inner = Arc::get_mut(&mut self.inner)
.expect("Cannot register: DynamicHub has multiple references");
inner.registry.write().unwrap().register(
plugin_id,
namespace.clone(),
namespace.clone(), );
inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
inner.child_routers.insert(namespace, Arc::new(activation_for_router));
inner.pending_rpc.lock().unwrap()
.push(Box::new(move || activation_for_rpc.into_rpc_methods()));
self
}
pub fn list_methods(&self) -> Vec<String> {
let mut methods = Vec::new();
for m in Activation::methods(self) {
methods.push(format!("{}.{}", self.inner.namespace, m));
}
for (ns, act) in &self.inner.activations {
for m in act.methods() {
methods.push(format!("{}.{}", ns, m));
}
}
methods.sort();
methods
}
pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
let mut activations = Vec::new();
activations.push(ActivationInfo {
namespace: Activation::namespace(self).to_string(),
version: Activation::version(self).to_string(),
description: Activation::description(self).to_string(),
methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
});
for a in self.inner.activations.values() {
activations.push(ActivationInfo {
namespace: a.namespace().to_string(),
version: a.version().to_string(),
description: a.description().to_string(),
methods: a.methods().iter().map(|s| s.to_string()).collect(),
});
}
activations
}
pub fn compute_hash(&self) -> String {
Activation::plugin_schema(self).hash
}
pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
self.route_with_ctx(method, params, auth, None).await
}
pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
let (namespace, method_name) = self.parse_method(method)?;
if namespace == self.inner.namespace {
return Activation::call(self, method_name, params, auth, raw_ctx).await;
}
let activation = self.inner.activations.get(namespace)
.ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
activation.call(method_name, params, auth, raw_ctx).await
}
pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
let path = self.inner.registry.read().unwrap()
.lookup(handle.plugin_id)
.map(|s| s.to_string())
.ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
let activation = self.inner.activations.get(&path)
.ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
activation.resolve_handle(handle).await
}
pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
self.inner.activations.get(namespace).map(|a| a.schema())
}
pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
let guard = self.inner.registry.read().unwrap();
PluginRegistrySnapshot {
by_id: guard.by_id.clone(),
by_path: guard.by_path.clone(),
}
}
pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
}
pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
self.inner.registry.read().unwrap().lookup_by_path(path)
}
pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
let mut schemas = Vec::new();
schemas.push(Activation::plugin_schema(self));
for a in self.inner.activations.values() {
schemas.push(a.plugin_schema());
}
schemas
}
#[deprecated(note = "Use list_plugin_schemas instead")]
pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
self.list_plugin_schemas()
}
pub fn get_method_help(&self, method: &str) -> Option<String> {
let (namespace, method_name) = self.parse_method(method).ok()?;
let activation = self.inner.activations.get(namespace)?;
activation.method_help(method_name)
}
fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
let parts: Vec<&str> = method.splitn(2, '.').collect();
if parts.len() != 2 {
return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
}
Ok((parts[0], parts[1]))
}
pub fn plugin_children(&self) -> Vec<ChildSummary> {
self.inner.activations.values()
.map(|a| {
let schema = a.plugin_schema();
ChildSummary {
namespace: schema.namespace,
description: schema.description,
hash: schema.hash,
}
})
.collect()
}
pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
let mut module = RpcModule::new(());
PlexusContext::init(self.compute_hash());
let ns = self.runtime_namespace();
let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
let plexus_for_call = self.clone();
module.register_subscription(
call_method,
PLEXUS_NOTIF_METHOD,
call_unsub,
move |params, pending, _ctx, _ext| {
let plexus = plexus_for_call.clone();
Box::pin(async move {
let p: CallParams = params.parse()?;
match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
Err(e) => {
let sink = pending.accept().await?;
let error_item = super::types::PlexusStreamItem::Error {
metadata: super::types::StreamMetadata::new(
vec![ns_static.into()],
PlexusContext::hash(),
),
message: e.to_string(),
code: Some(plexus_error_code(&e).to_string()),
recoverable: false,
};
if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
let _ = sink.send(raw).await;
}
Ok(())
}
}
})
}
)?;
let plexus_for_hash = self.clone();
module.register_subscription(
hash_method,
PLEXUS_NOTIF_METHOD,
hash_unsub,
move |_params, pending, _ctx, _ext| {
let plexus = plexus_for_hash.clone();
Box::pin(async move {
let schema = Activation::plugin_schema(&plexus);
let stream = async_stream::stream! {
yield HashEvent::Hash { value: schema.hash };
};
let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
pipe_stream_to_subscription(pending, wrapped).await
})
}
)?;
let plexus_for_schema = self.clone();
module.register_subscription(
schema_method,
PLEXUS_NOTIF_METHOD,
schema_unsub,
move |params, pending, _ctx, _ext| {
let plexus = plexus_for_schema.clone();
Box::pin(async move {
let p: SchemaParams = params.parse().unwrap_or_default();
let plugin_schema = Activation::plugin_schema(&plexus);
let result = if let Some(ref name) = p.method {
plugin_schema.methods.iter()
.find(|m| m.name == *name)
.map(|m| super::SchemaResult::Method(m.clone()))
.ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
-32602,
format!("Method '{}' not found", name),
None::<()>,
))?
} else {
super::SchemaResult::Plugin(plugin_schema)
};
let stream = async_stream::stream! { yield result; };
let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
pipe_stream_to_subscription(pending, wrapped).await
})
}
)?;
let info_payload = build_info_payload(
self.runtime_namespace(),
self.inner.auth_capabilities.as_ref(),
);
module.register_subscription(
"_info",
PLEXUS_NOTIF_METHOD,
"_info_unsub",
move |_params, pending, _ctx, _ext| {
let payload = info_payload.clone();
Box::pin(async move {
let info_stream = futures::stream::once(async move { payload });
let wrapped = super::streaming::wrap_stream(
info_stream,
"_info",
vec![]
);
pipe_stream_to_subscription(pending, wrapped).await
})
}
)?;
let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
for factory in pending {
module.merge(factory())?;
}
for (ns, router) in self.inner.child_routers.iter() {
register_child_capability_methods(&mut module, ns, router.clone())?;
}
Ok(module)
}
pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
let mut module = RpcModule::new(());
PlexusContext::init(hub.compute_hash());
let ns = hub.runtime_namespace();
let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
let hub_for_call = hub.clone();
module.register_subscription(
call_method,
call_method,
call_unsub,
move |params, pending, _ctx, ext| {
let hub = hub_for_call.clone();
Box::pin(async move {
let p: CallParams = params.parse()?;
let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
.map(|arc| arc.as_ref());
match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
Err(e) => {
let sink = pending.accept().await?;
let error_item = super::types::PlexusStreamItem::Error {
metadata: super::types::StreamMetadata::new(
vec![ns_static.into()],
PlexusContext::hash(),
),
message: e.to_string(),
code: Some(plexus_error_code(&e).to_string()),
recoverable: false,
};
if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
let _ = sink.send(raw).await;
}
Ok(())
}
}
})
}
)?;
let hub_for_hash = hub.clone();
module.register_subscription(
hash_method,
PLEXUS_NOTIF_METHOD,
hash_unsub,
move |_params, pending, _ctx, _ext| {
let hub = hub_for_hash.clone();
Box::pin(async move {
let schema = Activation::plugin_schema(&*hub);
let stream = async_stream::stream! {
yield HashEvent::Hash { value: schema.hash };
};
let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
pipe_stream_to_subscription(pending, wrapped).await
})
}
)?;
let hub_for_schema = hub.clone();
module.register_subscription(
schema_method,
PLEXUS_NOTIF_METHOD,
schema_unsub,
move |params, pending, _ctx, _ext| {
let hub = hub_for_schema.clone();
Box::pin(async move {
let p: SchemaParams = params.parse().unwrap_or_default();
let plugin_schema = Activation::plugin_schema(&*hub);
let result = if let Some(ref name) = p.method {
plugin_schema.methods.iter()
.find(|m| m.name == *name)
.map(|m| super::SchemaResult::Method(m.clone()))
.ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
-32602,
format!("Method '{}' not found", name),
None::<()>,
))?
} else {
super::SchemaResult::Plugin(plugin_schema)
};
let stream = async_stream::stream! {
yield result;
};
let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
pipe_stream_to_subscription(pending, wrapped).await
})
}
)?;
let info_payload = build_info_payload(
hub.runtime_namespace(),
hub.inner.auth_capabilities.as_ref(),
);
module.register_subscription(
"_info",
PLEXUS_NOTIF_METHOD,
"_info_unsub",
move |_params, pending, _ctx, _ext| {
let payload = info_payload.clone();
Box::pin(async move {
let info_stream = futures::stream::once(async move { payload });
let wrapped = super::streaming::wrap_stream(
info_stream,
"_info",
vec![]
);
pipe_stream_to_subscription(pending, wrapped).await
})
}
)?;
let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
module.register_async_method(respond_method, |params, _ctx, _ext| async move {
use super::bidirectional::{handle_pending_response, BidirError};
let p: RespondParams = params.parse()?;
tracing::debug!(
request_id = %p.request_id,
"Handling {}.respond via WebSocket",
"plexus"
);
match handle_pending_response(&p.request_id, p.response_data) {
Ok(()) => Ok(serde_json::json!({"success": true})),
Err(BidirError::UnknownRequest) => {
tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
Err(jsonrpsee::types::ErrorObject::owned(
-32602,
format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
None::<()>,
))
}
Err(BidirError::ChannelClosed) => {
tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
Err(jsonrpsee::types::ErrorObject::owned(
-32000,
"Response channel was closed (request may have timed out)",
None::<()>,
))
}
Err(e) => {
tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
Err(jsonrpsee::types::ErrorObject::owned(
-32000,
format!("Failed to deliver response: {}", e),
None::<()>,
))
}
}
})?;
let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
tracing::trace!(factories = pending.len(), "merging activation RPC factories");
for (idx, factory) in pending.into_iter().enumerate() {
tracing::trace!(factory_idx = idx, "calling factory to get Methods");
let methods = factory();
let method_count = methods.method_names().count();
tracing::trace!(factory_idx = idx, methods = method_count, "factory returned Methods; merging into module");
module.merge(methods)?;
tracing::trace!(factory_idx = idx, "successfully merged factory methods");
}
tracing::trace!("all activations merged successfully");
for (ns, router) in hub.inner.child_routers.iter() {
register_child_capability_methods(&mut module, ns, router.clone())?;
}
Ok(module)
}
}
#[allow(deprecated)] fn register_child_capability_methods(
module: &mut RpcModule<()>,
namespace: &str,
router: Arc<dyn ChildRouter>,
) -> Result<(), jsonrpsee::core::RegisterMethodError> {
let caps = router.capabilities();
if caps.is_empty() {
return Ok(());
}
let ns_static: &'static str = Box::leak(namespace.to_string().into_boxed_str());
if caps.contains(ChildCapabilities::LIST) {
let list_method: &'static str =
Box::leak(format!("{}.list_children", namespace).into_boxed_str());
let list_unsub: &'static str =
Box::leak(format!("{}.list_children_unsub", namespace).into_boxed_str());
let router_for_list = router.clone();
module.register_subscription(
list_method,
PLEXUS_NOTIF_METHOD,
list_unsub,
move |_params, pending, _ctx, _ext| {
let router = router_for_list.clone();
Box::pin(async move {
let collected: Vec<String> = match router.list_children().await {
Some(mut s) => {
use futures::StreamExt;
let mut acc = Vec::new();
while let Some(name) = s.next().await {
acc.push(name);
}
acc
}
None => Vec::new(),
};
let stream = async_stream::stream! {
for name in collected {
yield name;
}
};
let wrapped = super::streaming::wrap_stream(
stream,
"list_children",
vec![ns_static.into()],
);
pipe_stream_to_subscription(pending, wrapped).await
})
},
)?;
}
if caps.contains(ChildCapabilities::SEARCH) {
let search_method: &'static str =
Box::leak(format!("{}.search_children", namespace).into_boxed_str());
let search_unsub: &'static str =
Box::leak(format!("{}.search_children_unsub", namespace).into_boxed_str());
let router_for_search = router.clone();
module.register_subscription(
search_method,
PLEXUS_NOTIF_METHOD,
search_unsub,
move |params, pending, _ctx, _ext| {
let router = router_for_search.clone();
Box::pin(async move {
let p: SearchChildrenParams = params.parse()?;
let collected: Vec<String> = match router.search_children(&p.query).await {
Some(mut s) => {
use futures::StreamExt;
let mut acc = Vec::new();
while let Some(name) = s.next().await {
acc.push(name);
}
acc
}
None => Vec::new(),
};
let stream = async_stream::stream! {
for name in collected {
yield name;
}
};
let wrapped = super::streaming::wrap_stream(
stream,
"search_children",
vec![ns_static.into()],
);
pipe_stream_to_subscription(pending, wrapped).await
})
},
)?;
}
Ok(())
}
#[derive(Debug, serde::Deserialize)]
struct SearchChildrenParams {
query: String,
}
#[derive(Debug, serde::Deserialize)]
struct CallParams {
method: String,
#[serde(default)]
params: Option<Value>,
}
#[derive(Debug, Default, serde::Deserialize)]
struct SchemaParams {
method: Option<String>,
}
#[derive(Debug, serde::Deserialize)]
struct RespondParams {
request_id: String,
response_data: Value,
}
async fn pipe_stream_to_subscription(
pending: jsonrpsee::PendingSubscriptionSink,
mut stream: PlexusStream,
) -> jsonrpsee::core::SubscriptionResult {
use futures::StreamExt;
let sink = pending.accept().await?;
while let Some(item) = stream.next().await {
let json = serde_json::value::to_raw_value(&item)?;
sink.send(json).await?;
}
Ok(())
}
#[plexus_macros::activation(
namespace = "plexus",
version = "1.0.0",
description = "Central routing and introspection",
hub,
namespace_fn = "runtime_namespace"
)]
#[allow(deprecated)]
impl DynamicHub {
#[plexus_macros::method(
streaming,
description = "Route a call to a registered activation",
params(
method = "The method to call (format: namespace.method)",
params = "Parameters to pass to the method (optional, defaults to {})"
)
)]
async fn call(
&self,
method: String,
params: Option<Value>,
) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
use super::context::PlexusContext;
use super::types::{PlexusStreamItem, StreamMetadata};
let result = self.route(&method, params.unwrap_or_default(), None).await;
match result {
Ok(plexus_stream) => {
plexus_stream
}
Err(e) => {
let metadata = StreamMetadata::new(
vec![self.inner.namespace.clone()],
PlexusContext::hash(),
);
Box::pin(futures::stream::once(async move {
PlexusStreamItem::Error {
metadata,
message: e.to_string(),
code: None,
recoverable: false,
}
}))
}
}
}
#[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
let schema = Activation::plugin_schema(self);
stream! { yield HashEvent::Hash { value: schema.hash }; }
}
#[plexus_macros::method(description = "Get plugin hashes for cache validation")]
#[allow(deprecated)]
async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
let schema = Activation::plugin_schema(self);
stream! {
yield PluginHashes {
namespace: schema.namespace.clone(),
self_hash: schema.self_hash.clone(),
children_hash: schema.children_hash.clone(),
hash: schema.hash.clone(),
children: schema.children.as_ref().map(|kids| {
kids.iter()
.map(|c| ChildHashes {
namespace: c.namespace.clone(),
hash: c.hash.clone(),
})
.collect()
}),
};
}
}
}
use super::hub_context::HubContext;
use std::sync::Weak;
#[async_trait]
impl HubContext for Weak<DynamicHub> {
async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
let hub = self.upgrade().ok_or_else(|| {
PlexusError::ExecutionError("Parent hub has been dropped".to_string())
})?;
hub.do_resolve_handle(handle).await
}
async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
let hub = self.upgrade().ok_or_else(|| {
PlexusError::ExecutionError("Parent hub has been dropped".to_string())
})?;
hub.route(method, params, None).await
}
fn is_valid(&self) -> bool {
self.upgrade().is_some()
}
}
#[async_trait]
impl ChildRouter for DynamicHub {
fn router_namespace(&self) -> &str {
&self.inner.namespace
}
async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
self.route_with_ctx(method, params, auth, raw_ctx).await
}
async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
self.inner.child_routers.get(name)
.map(|router| {
Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
})
}
fn forward_policy_for(
&self,
callee_ns: &str,
) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
self.inner.forward_policies.get(callee_ns)
}
}
#[cfg(test)]
#[allow(deprecated)]
mod tests {
use super::*;
#[test]
fn dynamic_hub_implements_activation() {
fn assert_activation<T: Activation>() {}
assert_activation::<DynamicHub>();
}
#[test]
fn dynamic_hub_methods() {
let hub = DynamicHub::new("test");
let methods = hub.methods();
assert!(methods.contains(&"call"));
assert!(methods.contains(&"hash"));
assert!(methods.contains(&"schema"));
}
#[test]
fn dynamic_hub_hash_stable() {
let h1 = DynamicHub::new("test");
let h2 = DynamicHub::new("test");
assert_eq!(h1.compute_hash(), h2.compute_hash());
}
#[test]
fn dynamic_hub_is_hub() {
use crate::activations::health::Health;
let hub = DynamicHub::new("test").register(Health::new());
let schema = hub.plugin_schema();
assert!(schema.is_hub(), "dynamic hub should be a hub");
assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
let children = schema.children.expect("dynamic hub should have children");
assert!(!children.is_empty(), "dynamic hub should have at least one child");
let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
assert!(!health.hash.is_empty(), "health should have a hash");
}
#[test]
fn dynamic_hub_schema_structure() {
use crate::activations::health::Health;
let hub = DynamicHub::new("test").register(Health::new());
let schema = hub.plugin_schema();
let json = serde_json::to_string_pretty(&schema).unwrap();
println!("DynamicHub schema:\n{}", json);
assert_eq!(schema.namespace, "test");
assert!(schema.methods.iter().any(|m| m.name == "call"));
assert!(schema.children.is_some());
}
#[tokio::test]
async fn invariant_resolve_handle_unknown_activation() {
use crate::activations::health::Health;
use crate::types::Handle;
use uuid::Uuid;
let hub = DynamicHub::new("test").register(Health::new());
let unknown_plugin_id = Uuid::new_v4();
let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
let result = hub.do_resolve_handle(&handle).await;
match result {
Err(PlexusError::ActivationNotFound(_)) => {
}
Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
Ok(_) => panic!("Expected error for unknown activation"),
}
}
#[tokio::test]
async fn invariant_resolve_handle_unsupported() {
use crate::activations::health::Health;
use crate::types::Handle;
let hub = DynamicHub::new("test").register(Health::new());
let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
let result = hub.do_resolve_handle(&handle).await;
match result {
Err(PlexusError::HandleNotSupported(name)) => {
assert_eq!(name, "health");
}
Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
Ok(_) => panic!("Expected error for unsupported handle"),
}
}
#[tokio::test]
async fn invariant_resolve_handle_routes_by_plugin_id() {
use crate::activations::health::Health;
use crate::activations::echo::Echo;
use crate::types::Handle;
use uuid::Uuid;
let health = Health::new();
let echo = Echo::new();
let health_plugin_id = health.plugin_id();
let echo_plugin_id = echo.plugin_id();
let hub = DynamicHub::new("test")
.register(health)
.register(echo);
let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
match hub.do_resolve_handle(&health_handle).await {
Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
Err(other) => panic!("health handle should route to health activation, got {:?}", other),
Ok(_) => panic!("health handle should return HandleNotSupported"),
}
let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
match hub.do_resolve_handle(&echo_handle).await {
Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
Ok(_) => panic!("echo handle should return HandleNotSupported"),
}
let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
match hub.do_resolve_handle(&unknown_handle).await {
Err(PlexusError::ActivationNotFound(_)) => { },
Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
Ok(_) => panic!("unknown handle should return ActivationNotFound"),
}
}
#[test]
fn invariant_handle_plugin_id_determines_routing() {
use crate::activations::health::Health;
use crate::activations::echo::Echo;
use crate::types::Handle;
let health = Health::new();
let echo = Echo::new();
let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
.with_meta(vec!["msg-123".into(), "user".into()]);
let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
.with_meta(vec!["msg-123".into(), "user".into()]);
assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
}
#[test]
fn plugin_registry_basic_operations() {
let mut registry = PluginRegistry::new();
let id = uuid::Uuid::new_v4();
registry.register(id, "test_plugin".to_string(), "test".to_string());
assert_eq!(registry.lookup(id), Some("test_plugin"));
assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
let entry = registry.get(id).expect("should have entry");
assert_eq!(entry.path, "test_plugin");
assert_eq!(entry.plugin_type, "test");
}
#[test]
fn plugin_registry_populated_on_register() {
use crate::activations::health::Health;
let hub = DynamicHub::new("test").register(Health::new());
let registry = hub.registry();
assert!(!registry.is_empty(), "registry should not be empty after registration");
let health_id = registry.lookup_by_path("health");
assert!(health_id.is_some(), "health should be registered by path");
let health_uuid = health_id.unwrap();
assert_eq!(registry.lookup(health_uuid), Some("health"));
}
#[test]
fn plugin_registry_deterministic_uuid() {
use crate::activations::health::Health;
let health1 = Health::new();
let health2 = Health::new();
assert_eq!(health1.plugin_id(), health2.plugin_id(),
"same activation type should have deterministic UUID");
let expected = uuid::Uuid::new_v5(
&uuid::Uuid::NAMESPACE_OID,
b"health@1"
);
assert_eq!(health1.plugin_id(), expected,
"plugin_id should be deterministic from namespace@major_version");
}
struct MinimalRouter;
#[async_trait]
impl ChildRouter for MinimalRouter {
fn router_namespace(&self) -> &str {
"minimal"
}
async fn router_call(
&self,
_method: &str,
_params: Value,
_auth: Option<&super::super::auth::AuthContext>,
_raw_ctx: Option<&crate::request::RawRequestContext>,
) -> Result<PlexusStream, PlexusError> {
Err(PlexusError::MethodNotFound {
activation: "minimal".into(),
method: "none".into(),
})
}
async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
None
}
}
#[tokio::test]
async fn child_router_defaults_report_no_capabilities_and_none_streams() {
let router = MinimalRouter;
assert_eq!(
router.capabilities(),
ChildCapabilities::empty(),
"default capabilities should be empty"
);
assert!(
router.list_children().await.is_none(),
"default list_children should be None"
);
assert!(
router.search_children("anything").await.is_none(),
"default search_children should be None"
);
}
struct ListingRouter {
names: Vec<String>,
}
#[async_trait]
impl ChildRouter for ListingRouter {
fn router_namespace(&self) -> &str {
"listing"
}
async fn router_call(
&self,
_method: &str,
_params: Value,
_auth: Option<&super::super::auth::AuthContext>,
_raw_ctx: Option<&crate::request::RawRequestContext>,
) -> Result<PlexusStream, PlexusError> {
Err(PlexusError::MethodNotFound {
activation: "listing".into(),
method: "none".into(),
})
}
async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
if self.names.iter().any(|n| n == name) {
Some(Box::new(ListingRouter { names: vec![] }))
} else {
None
}
}
fn capabilities(&self) -> ChildCapabilities {
ChildCapabilities::LIST | ChildCapabilities::SEARCH
}
async fn list_children(&self) -> Option<BoxStream<'_, String>> {
let stream = futures::stream::iter(self.names.iter().cloned());
Some(Box::pin(stream))
}
async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
let q = query.to_string();
let stream = futures::stream::iter(
self.names
.iter()
.filter(move |n| n.contains(&q))
.cloned()
.collect::<Vec<_>>(),
);
Some(Box::pin(stream))
}
}
#[tokio::test]
async fn child_router_overrides_report_capabilities_and_yield_streams() {
use futures::StreamExt;
let router = ListingRouter {
names: vec!["alpha".into(), "beta".into(), "alphabet".into()],
};
let caps = router.capabilities();
assert!(caps.contains(ChildCapabilities::LIST));
assert!(caps.contains(ChildCapabilities::SEARCH));
assert_eq!(caps, ChildCapabilities::LIST | ChildCapabilities::SEARCH);
let list_stream = router
.list_children()
.await
.expect("LIST capability set — expected Some(stream)");
let listed: Vec<String> = list_stream.collect().await;
assert_eq!(listed, vec!["alpha".to_string(), "beta".into(), "alphabet".into()]);
let search_stream = router
.search_children("alpha")
.await
.expect("SEARCH capability set — expected Some(stream)");
let matched: Vec<String> = search_stream.collect().await;
assert_eq!(matched, vec!["alpha".to_string(), "alphabet".into()]);
}
struct WireFixture {
names: Vec<String>,
caps: ChildCapabilities,
}
#[async_trait]
impl ChildRouter for WireFixture {
fn router_namespace(&self) -> &str {
"wirefixture"
}
async fn router_call(
&self,
_method: &str,
_params: Value,
_auth: Option<&super::super::auth::AuthContext>,
_raw_ctx: Option<&crate::request::RawRequestContext>,
) -> Result<PlexusStream, PlexusError> {
Err(PlexusError::MethodNotFound {
activation: "wirefixture".into(),
method: "none".into(),
})
}
async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
None
}
fn capabilities(&self) -> ChildCapabilities {
self.caps
}
async fn list_children(&self) -> Option<futures_core::stream::BoxStream<'_, String>> {
if !self.caps.contains(ChildCapabilities::LIST) {
return None;
}
Some(Box::pin(futures::stream::iter(self.names.clone())))
}
async fn search_children(
&self,
query: &str,
) -> Option<futures_core::stream::BoxStream<'_, String>> {
if !self.caps.contains(ChildCapabilities::SEARCH) {
return None;
}
let q = query.to_lowercase();
let filtered: Vec<String> = self
.names
.iter()
.filter(|n| n.to_lowercase().contains(&q))
.cloned()
.collect();
Some(Box::pin(futures::stream::iter(filtered)))
}
}
fn build_module_for(router: WireFixture, ns: &str) -> RpcModule<()> {
let mut module = RpcModule::new(());
let arc: Arc<dyn ChildRouter> = Arc::new(router);
register_child_capability_methods(&mut module, ns, arc).expect("register");
module
}
#[tokio::test]
async fn child_wire_registers_both_methods_when_both_bits_set() {
let module = build_module_for(
WireFixture {
names: vec!["alpha".into(), "beta".into()],
caps: ChildCapabilities::LIST | ChildCapabilities::SEARCH,
},
"fixture",
);
let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
assert!(
names.contains(&"fixture.list_children".to_string()),
"expected fixture.list_children, got: {:?}",
names
);
assert!(
names.contains(&"fixture.search_children".to_string()),
"expected fixture.search_children, got: {:?}",
names
);
}
#[tokio::test]
async fn child_wire_registers_nothing_when_no_bits_set() {
let module = build_module_for(
WireFixture {
names: vec!["alpha".into()],
caps: ChildCapabilities::empty(),
},
"fixture",
);
let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
assert!(
!names.contains(&"fixture.list_children".to_string()),
"fixture.list_children should NOT be registered when cap absent"
);
assert!(
!names.contains(&"fixture.search_children".to_string()),
"fixture.search_children should NOT be registered when cap absent"
);
}
#[tokio::test]
async fn child_wire_registers_only_list_when_only_list_bit() {
let module = build_module_for(
WireFixture {
names: vec!["alpha".into()],
caps: ChildCapabilities::LIST,
},
"fixture",
);
let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
assert!(names.contains(&"fixture.list_children".to_string()));
assert!(!names.contains(&"fixture.search_children".to_string()));
}
}