1use super::{
9 context::PlexusContext,
10 method_enum::MethodEnumSchema,
11 schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12 streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use bitflags::bitflags;
18use futures::Stream;
19use futures_core::stream::BoxStream;
20use jsonrpsee::core::server::Methods;
21use jsonrpsee::RpcModule;
22
23pub const PLEXUS_NOTIF_METHOD: &str = "result";
29use schemars::JsonSchema;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35#[derive(Debug, Clone)]
40pub enum PlexusError {
41 ActivationNotFound(String),
42 MethodNotFound { activation: String, method: String },
43 InvalidParams(String),
44 ExecutionError(String),
45 HandleNotSupported(String),
46 TransportError(TransportErrorKind),
47 Unauthenticated(String),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(tag = "error_kind", rename_all = "snake_case")]
52pub enum TransportErrorKind {
53 ConnectionRefused { host: String, port: u16 },
54 ConnectionTimeout { host: String, port: u16 },
55 ProtocolError { message: String },
56 NetworkError { message: String },
57}
58
59impl std::fmt::Display for TransportErrorKind {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 TransportErrorKind::ConnectionRefused { host, port } => {
63 write!(f, "Connection refused to {}:{}", host, port)
64 }
65 TransportErrorKind::ConnectionTimeout { host, port } => {
66 write!(f, "Connection timeout to {}:{}", host, port)
67 }
68 TransportErrorKind::ProtocolError { message } => {
69 write!(f, "Protocol error: {}", message)
70 }
71 TransportErrorKind::NetworkError { message } => {
72 write!(f, "Network error: {}", message)
73 }
74 }
75 }
76}
77
78impl std::fmt::Display for PlexusError {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
82 PlexusError::MethodNotFound { activation, method } => {
83 write!(f, "Method not found: {}.{}", activation, method)
84 }
85 PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
86 PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
87 PlexusError::HandleNotSupported(activation) => {
88 write!(f, "Handle resolution not supported by activation: {}", activation)
89 }
90 PlexusError::TransportError(kind) => match kind {
91 TransportErrorKind::ConnectionRefused { host, port } => {
92 write!(f, "Connection refused to {}:{}", host, port)
93 }
94 TransportErrorKind::ConnectionTimeout { host, port } => {
95 write!(f, "Connection timeout to {}:{}", host, port)
96 }
97 TransportErrorKind::ProtocolError { message } => {
98 write!(f, "Protocol error: {}", message)
99 }
100 TransportErrorKind::NetworkError { message } => {
101 write!(f, "Network error: {}", message)
102 }
103 }
104 PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
105 }
106 }
107}
108
109impl std::error::Error for PlexusError {}
110
111fn plexus_error_code(e: &PlexusError) -> i32 {
120 match e {
121 PlexusError::Unauthenticated(_) => -32001,
122 PlexusError::InvalidParams(_) => -32602,
123 PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
124 _ => -32000,
125 }
126}
127
128fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
130 jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
138pub struct ActivationInfo {
139 pub namespace: String,
140 pub version: String,
141 pub description: String,
142 pub methods: Vec<String>,
143}
144
145#[async_trait]
150pub trait Activation: Send + Sync + 'static {
151 type Methods: MethodEnumSchema;
152
153 fn namespace(&self) -> &str;
154 fn version(&self) -> &str;
155 fn description(&self) -> &str { "No description available" }
157 fn long_description(&self) -> Option<&str> { None }
159 fn methods(&self) -> Vec<&str>;
160 fn method_help(&self, _method: &str) -> Option<String> { None }
161 fn plugin_id(&self) -> uuid::Uuid {
165 let major_version = self.version().split('.').next().unwrap_or("0");
166 uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
167 }
168
169 async fn call(
170 &self,
171 method: &str,
172 params: Value,
173 auth: Option<&super::auth::AuthContext>,
174 raw_ctx: Option<&crate::request::RawRequestContext>,
175 ) -> Result<PlexusStream, PlexusError>;
176 async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
177 Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
178 }
179
180 fn into_rpc_methods(self) -> Methods where Self: Sized;
181
182 fn plugin_schema(&self) -> PluginSchema {
184 use std::collections::hash_map::DefaultHasher;
185 use std::hash::{Hash, Hasher};
186
187 let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
188 let desc = self.method_help(name).unwrap_or_default();
189 let mut hasher = DefaultHasher::new();
191 name.hash(&mut hasher);
192 desc.hash(&mut hasher);
193 let hash = format!("{:016x}", hasher.finish());
194 MethodSchema::new(name.to_string(), desc, hash)
195 }).collect();
196
197 if let Some(long_desc) = self.long_description() {
198 PluginSchema::leaf_with_long_description(
199 self.namespace(),
200 self.version(),
201 self.description(),
202 long_desc,
203 methods,
204 )
205 } else {
206 PluginSchema::leaf(
207 self.namespace(),
208 self.version(),
209 self.description(),
210 methods,
211 )
212 }
213 }
214}
215
216bitflags! {
221 #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
250 #[deprecated(
251 since = "0.5",
252 note = "Use MethodRole::DynamicChild { list_method, search_method } instead. Removed in 0.7."
253 )]
254 pub struct ChildCapabilities: u32 {
255 const LIST = 0b0000_0001;
257 const SEARCH = 0b0000_0010;
260 }
261}
262
263#[async_trait]
280pub trait ChildRouter: Send + Sync {
281 fn router_namespace(&self) -> &str;
283
284 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
286
287 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
289
290 #[allow(deprecated)]
295 fn capabilities(&self) -> ChildCapabilities {
296 ChildCapabilities::empty()
297 }
298
299 async fn list_children(&self) -> Option<BoxStream<'_, String>> {
307 None
308 }
309
310 async fn search_children(&self, _query: &str) -> Option<BoxStream<'_, String>> {
318 None
319 }
320
321 fn forward_policy_for(
336 &self,
337 _callee_ns: &str,
338 ) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
339 None
340 }
341
342 fn framework_stamped_principal(&self) -> plexus_auth_core::Principal {
351 plexus_auth_core::Principal::Anonymous
352 }
353}
354
355pub async fn route_to_child<T: ChildRouter + ?Sized>(
386 parent: &T,
387 method: &str,
388 params: Value,
389 auth: Option<&super::auth::AuthContext>,
390 raw_ctx: Option<&crate::request::RawRequestContext>,
391) -> Result<PlexusStream, PlexusError> {
392 if let Some((child_name, rest)) = method.split_once('.') {
394 if let Some(child) = parent.get_child(child_name).await {
395 let policy: std::sync::Arc<dyn plexus_auth_core::ForwardPolicy> = parent
405 .forward_policy_for(child_name)
406 .unwrap_or_else(|| {
407 std::sync::Arc::new(plexus_auth_core::IdentityOnly)
408 as std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>
409 });
410
411 let callee_method_str = format!("{}.{}", child_name, rest);
417 let callee_method = plexus_auth_core::MethodPath::try_new(callee_method_str.as_str())
418 .map_err(|e| PlexusError::ExecutionError(format!(
419 "framework-built MethodPath rejected: {} ({:?})",
420 callee_method_str, e
421 )))?;
422 let site = plexus_auth_core::CallSite::new(
423 parent.framework_stamped_principal(),
424 callee_method,
425 );
426
427 let anonymous_owned;
431 let caller_ctx: &super::auth::AuthContext = match auth {
432 Some(ctx) => ctx,
433 None => {
434 anonymous_owned = super::auth::AuthContext::anonymous();
435 &anonymous_owned
436 }
437 };
438 let derivation = policy.forward(caller_ctx, &site);
439
440 tracing::trace!(
461 target: "plexus::audit",
462 policy = policy.name().as_str(),
463 callee_method = %site.callee_method.as_str(),
464 derivation_keep_verified_user = derivation.keep_verified_user,
465 derivation_keep_roles = derivation.keep_roles,
466 derivation_keep_capabilities = derivation.keep_capabilities,
467 derivation_keep_metadata = derivation.keep_metadata,
468 "forward_policy_applied (audit-record emission stubbed pending PRIVACY-1)"
469 );
470
471 return match auth {
478 Some(caller_ctx) => {
479 caller_ctx
480 .with_callee_context(&derivation, &site.caller, |callee_ctx| async move {
481 child
482 .router_call(rest, params, Some(&callee_ctx), raw_ctx)
483 .await
484 })
485 .await
486 }
487 None => child.router_call(rest, params, None, raw_ctx).await,
488 };
489 }
490 return Err(PlexusError::ActivationNotFound(child_name.to_string()));
491 }
492
493 Err(PlexusError::MethodNotFound {
495 activation: parent.router_namespace().to_string(),
496 method: method.to_string(),
497 })
498}
499
500struct ArcChildRouter(Arc<dyn ChildRouter>);
504
505#[async_trait]
506impl ChildRouter for ArcChildRouter {
507 fn router_namespace(&self) -> &str {
508 self.0.router_namespace()
509 }
510
511 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
512 self.0.router_call(method, params, auth, raw_ctx).await
513 }
514
515 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
516 self.0.get_child(name).await
517 }
518
519 #[allow(deprecated)]
520 fn capabilities(&self) -> ChildCapabilities {
521 self.0.capabilities()
522 }
523
524 async fn list_children(&self) -> Option<BoxStream<'_, String>> {
525 self.0.list_children().await
526 }
527
528 async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
529 self.0.search_children(query).await
530 }
531
532 fn forward_policy_for(
536 &self,
537 callee_ns: &str,
538 ) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
539 self.0.forward_policy_for(callee_ns)
540 }
541
542 fn framework_stamped_principal(&self) -> plexus_auth_core::Principal {
543 self.0.framework_stamped_principal()
544 }
545}
546
547#[async_trait]
552#[allow(dead_code)] trait ActivationObject: Send + Sync + 'static {
554 fn namespace(&self) -> &str;
555 fn version(&self) -> &str;
556 fn description(&self) -> &str;
557 fn long_description(&self) -> Option<&str>;
558 fn methods(&self) -> Vec<&str>;
559 fn method_help(&self, method: &str) -> Option<String>;
560 fn plugin_id(&self) -> uuid::Uuid;
561 async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
562 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
563 fn plugin_schema(&self) -> PluginSchema;
564 fn schema(&self) -> Schema;
565}
566
567struct ActivationWrapper<A: Activation> {
568 inner: A,
569}
570
571#[async_trait]
572impl<A: Activation> ActivationObject for ActivationWrapper<A> {
573 fn namespace(&self) -> &str { self.inner.namespace() }
574 fn version(&self) -> &str { self.inner.version() }
575 fn description(&self) -> &str { self.inner.description() }
576 fn long_description(&self) -> Option<&str> { self.inner.long_description() }
577 fn methods(&self) -> Vec<&str> { self.inner.methods() }
578 fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
579 fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
580
581 async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
582 self.inner.call(method, params, auth, raw_ctx).await
583 }
584
585 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
586 self.inner.resolve_handle(handle).await
587 }
588
589 fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
590
591 fn schema(&self) -> Schema {
592 let schema = schemars::schema_for!(A::Methods);
593 serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
594 .expect("parse schema")
595 }
596}
597
598#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
603#[serde(tag = "event", rename_all = "snake_case")]
604pub enum HashEvent {
605 Hash { value: String },
606}
607
608#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
610#[serde(tag = "event", rename_all = "snake_case")]
611pub enum SchemaEvent {
612 Schema(PluginSchema),
614}
615
616#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
618pub struct PluginHashes {
619 pub namespace: String,
620 pub self_hash: String,
621 #[serde(skip_serializing_if = "Option::is_none")]
622 pub children_hash: Option<String>,
623 pub hash: String,
624 #[serde(skip_serializing_if = "Option::is_none")]
626 pub children: Option<Vec<ChildHashes>>,
627}
628
629#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
631pub struct ChildHashes {
632 pub namespace: String,
633 pub hash: String,
634}
635
636
637#[derive(Debug, Clone)]
643pub struct PluginEntry {
644 pub id: uuid::Uuid,
646 pub path: String,
648 pub plugin_type: String,
650}
651
652#[derive(Default)]
657pub struct PluginRegistry {
658 by_id: HashMap<uuid::Uuid, PluginEntry>,
660 by_path: HashMap<String, uuid::Uuid>,
662}
663
664#[derive(Clone)]
668pub struct PluginRegistrySnapshot {
669 by_id: HashMap<uuid::Uuid, PluginEntry>,
670 by_path: HashMap<String, uuid::Uuid>,
671}
672
673impl PluginRegistrySnapshot {
674 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
676 self.by_id.get(&id).map(|e| e.path.as_str())
677 }
678
679 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
681 self.by_path.get(path).copied()
682 }
683
684 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
686 self.by_id.get(&id)
687 }
688
689 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
691 self.by_id.values()
692 }
693
694 pub fn len(&self) -> usize {
696 self.by_id.len()
697 }
698
699 pub fn is_empty(&self) -> bool {
701 self.by_id.is_empty()
702 }
703}
704
705impl PluginRegistry {
706 pub fn new() -> Self {
708 Self::default()
709 }
710
711 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
713 self.by_id.get(&id).map(|e| e.path.as_str())
714 }
715
716 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
718 self.by_path.get(path).copied()
719 }
720
721 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
723 self.by_id.get(&id)
724 }
725
726 pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
728 let entry = PluginEntry { id, path: path.clone(), plugin_type };
729 self.by_id.insert(id, entry);
730 self.by_path.insert(path, id);
731 }
732
733 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
735 self.by_id.values()
736 }
737
738 pub fn len(&self) -> usize {
740 self.by_id.len()
741 }
742
743 pub fn is_empty(&self) -> bool {
745 self.by_id.is_empty()
746 }
747}
748
749fn build_info_payload(
762 namespace: &str,
763 caps: Option<&plexus_auth_core::BackendAuthCapabilities>,
764) -> serde_json::Value {
765 let advertised = match caps {
766 Some(c) => c.clone(),
767 None => plexus_auth_core::BackendAuthCapabilities::anonymous_default(),
768 };
769 serde_json::json!({
770 "backend": namespace,
771 "auth_capabilities": advertised,
772 })
773}
774
775struct DynamicHubInner {
776 namespace: String,
778 activations: HashMap<String, Arc<dyn ActivationObject>>,
779 child_routers: HashMap<String, Arc<dyn ChildRouter>>,
781 registry: std::sync::RwLock<PluginRegistry>,
783 pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
784 auth_capabilities: Option<plexus_auth_core::BackendAuthCapabilities>,
795 forward_policies: super::forward_registry::ForwardPolicyRegistry,
804}
805
806#[derive(Clone)]
828pub struct DynamicHub {
829 inner: Arc<DynamicHubInner>,
830}
831
832impl DynamicHub {
837 pub fn new(namespace: impl Into<String>) -> Self {
847 Self {
848 inner: Arc::new(DynamicHubInner {
849 namespace: namespace.into(),
850 activations: HashMap::new(),
851 child_routers: HashMap::new(),
852 registry: std::sync::RwLock::new(PluginRegistry::new()),
853 pending_rpc: std::sync::Mutex::new(Vec::new()),
854 auth_capabilities: None,
855 forward_policies: super::forward_registry::ForwardPolicyRegistry::new(),
856 }),
857 }
858 }
859
860 pub fn with_forward_policy(
881 mut self,
882 callee_ns: impl Into<String>,
883 policy: std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>,
884 ) -> Self {
885 let inner = Arc::get_mut(&mut self.inner)
886 .expect("Cannot register forward policy: DynamicHub has multiple references");
887 inner.forward_policies.register(callee_ns, policy);
888 self
889 }
890
891 pub fn forward_policies(&self) -> &super::forward_registry::ForwardPolicyRegistry {
896 &self.inner.forward_policies
897 }
898
899 pub fn with_auth_capabilities(
935 mut self,
936 caps: plexus_auth_core::BackendAuthCapabilities,
937 ) -> Self {
938 let inner = Arc::get_mut(&mut self.inner)
939 .expect("Cannot set auth_capabilities: DynamicHub has multiple references");
940 inner.auth_capabilities = Some(caps);
941 self
942 }
943
944 pub fn auth_capabilities(&self) -> Option<&plexus_auth_core::BackendAuthCapabilities> {
951 self.inner.auth_capabilities.as_ref()
952 }
953
954 #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
956 pub fn with_namespace(namespace: impl Into<String>) -> Self {
957 Self::new(namespace)
958 }
959
960 pub fn runtime_namespace(&self) -> &str {
962 &self.inner.namespace
963 }
964
965 pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
967 self.inner.registry.read().unwrap()
968 }
969
970 pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
972 let namespace = activation.namespace().to_string();
973 let plugin_id = activation.plugin_id();
974 let activation_for_rpc = activation.clone();
975 let activation_for_router = activation.clone();
976
977 let inner = Arc::get_mut(&mut self.inner)
978 .expect("Cannot register: DynamicHub has multiple references");
979
980 inner.registry.write().unwrap().register(
982 plugin_id,
983 namespace.clone(),
984 namespace.clone(), );
986
987 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
988 inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
989 inner.pending_rpc.lock().unwrap()
990 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
991 self
992 }
993
994 #[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
999 pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
1000 let namespace = activation.namespace().to_string();
1001 let plugin_id = activation.plugin_id();
1002 let activation_for_rpc = activation.clone();
1003 let activation_for_router = activation.clone();
1004
1005 let inner = Arc::get_mut(&mut self.inner)
1006 .expect("Cannot register: DynamicHub has multiple references");
1007
1008 inner.registry.write().unwrap().register(
1010 plugin_id,
1011 namespace.clone(),
1012 namespace.clone(), );
1014
1015 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
1016 inner.child_routers.insert(namespace, Arc::new(activation_for_router));
1017 inner.pending_rpc.lock().unwrap()
1018 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
1019 self
1020 }
1021
1022 pub fn list_methods(&self) -> Vec<String> {
1024 let mut methods = Vec::new();
1025
1026 for m in Activation::methods(self) {
1028 methods.push(format!("{}.{}", self.inner.namespace, m));
1029 }
1030
1031 for (ns, act) in &self.inner.activations {
1033 for m in act.methods() {
1034 methods.push(format!("{}.{}", ns, m));
1035 }
1036 }
1037 methods.sort();
1038 methods
1039 }
1040
1041 pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
1043 let mut activations = Vec::new();
1044
1045 activations.push(ActivationInfo {
1047 namespace: Activation::namespace(self).to_string(),
1048 version: Activation::version(self).to_string(),
1049 description: Activation::description(self).to_string(),
1050 methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
1051 });
1052
1053 for a in self.inner.activations.values() {
1055 activations.push(ActivationInfo {
1056 namespace: a.namespace().to_string(),
1057 version: a.version().to_string(),
1058 description: a.description().to_string(),
1059 methods: a.methods().iter().map(|s| s.to_string()).collect(),
1060 });
1061 }
1062
1063 activations
1064 }
1065
1066 pub fn compute_hash(&self) -> String {
1071 Activation::plugin_schema(self).hash
1072 }
1073
1074 pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
1076 self.route_with_ctx(method, params, auth, None).await
1077 }
1078
1079 pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1081 let (namespace, method_name) = self.parse_method(method)?;
1082
1083 if namespace == self.inner.namespace {
1085 return Activation::call(self, method_name, params, auth, raw_ctx).await;
1086 }
1087
1088 let activation = self.inner.activations.get(namespace)
1089 .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
1090
1091 activation.call(method_name, params, auth, raw_ctx).await
1092 }
1093
1094 pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1098 let path = self.inner.registry.read().unwrap()
1099 .lookup(handle.plugin_id)
1100 .map(|s| s.to_string())
1101 .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
1102
1103 let activation = self.inner.activations.get(&path)
1104 .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
1105 activation.resolve_handle(handle).await
1106 }
1107
1108 pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
1110 self.inner.activations.get(namespace).map(|a| a.schema())
1111 }
1112
1113 pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
1115 let guard = self.inner.registry.read().unwrap();
1116 PluginRegistrySnapshot {
1117 by_id: guard.by_id.clone(),
1118 by_path: guard.by_path.clone(),
1119 }
1120 }
1121
1122 pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
1124 self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
1125 }
1126
1127 pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
1129 self.inner.registry.read().unwrap().lookup_by_path(path)
1130 }
1131
1132 pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
1134 let mut schemas = Vec::new();
1135
1136 schemas.push(Activation::plugin_schema(self));
1138
1139 for a in self.inner.activations.values() {
1141 schemas.push(a.plugin_schema());
1142 }
1143
1144 schemas
1145 }
1146
1147 #[deprecated(note = "Use list_plugin_schemas instead")]
1149 pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
1150 self.list_plugin_schemas()
1151 }
1152
1153 pub fn get_method_help(&self, method: &str) -> Option<String> {
1155 let (namespace, method_name) = self.parse_method(method).ok()?;
1156 let activation = self.inner.activations.get(namespace)?;
1157 activation.method_help(method_name)
1158 }
1159
1160 fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
1161 let parts: Vec<&str> = method.splitn(2, '.').collect();
1162 if parts.len() != 2 {
1163 return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
1164 }
1165 Ok((parts[0], parts[1]))
1166 }
1167
1168 pub fn plugin_children(&self) -> Vec<ChildSummary> {
1171 self.inner.activations.values()
1172 .map(|a| {
1173 let schema = a.plugin_schema();
1174 ChildSummary {
1175 namespace: schema.namespace,
1176 description: schema.description,
1177 hash: schema.hash,
1178 }
1179 })
1180 .collect()
1181 }
1182
1183 pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
1185 let mut module = RpcModule::new(());
1186
1187 PlexusContext::init(self.compute_hash());
1188
1189 let ns = self.runtime_namespace();
1192 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
1193 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
1194 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1195 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
1196 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1197 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
1198 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1199 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1200 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
1201
1202 let plexus_for_call = self.clone();
1204 module.register_subscription(
1205 call_method,
1206 PLEXUS_NOTIF_METHOD,
1207 call_unsub,
1208 move |params, pending, _ctx, _ext| {
1209 let plexus = plexus_for_call.clone();
1210 Box::pin(async move {
1211 let p: CallParams = params.parse()?;
1212 match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
1213 Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
1214 Err(e) => {
1215 let sink = pending.accept().await?;
1216 let error_item = super::types::PlexusStreamItem::Error {
1217 metadata: super::types::StreamMetadata::new(
1218 vec![ns_static.into()],
1219 PlexusContext::hash(),
1220 ),
1221 message: e.to_string(),
1222 code: Some(plexus_error_code(&e).to_string()),
1223 recoverable: false,
1224 };
1225 if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
1226 let _ = sink.send(raw).await;
1227 }
1228 Ok(())
1229 }
1230 }
1231 })
1232 }
1233 )?;
1234
1235 let plexus_for_hash = self.clone();
1237 module.register_subscription(
1238 hash_method,
1239 PLEXUS_NOTIF_METHOD,
1240 hash_unsub,
1241 move |_params, pending, _ctx, _ext| {
1242 let plexus = plexus_for_hash.clone();
1243 Box::pin(async move {
1244 let schema = Activation::plugin_schema(&plexus);
1245 let stream = async_stream::stream! {
1246 yield HashEvent::Hash { value: schema.hash };
1247 };
1248 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1249 pipe_stream_to_subscription(pending, wrapped).await
1250 })
1251 }
1252 )?;
1253
1254 let plexus_for_schema = self.clone();
1256 module.register_subscription(
1257 schema_method,
1258 PLEXUS_NOTIF_METHOD,
1259 schema_unsub,
1260 move |params, pending, _ctx, _ext| {
1261 let plexus = plexus_for_schema.clone();
1262 Box::pin(async move {
1263 let p: SchemaParams = params.parse().unwrap_or_default();
1264 let plugin_schema = Activation::plugin_schema(&plexus);
1265
1266 let result = if let Some(ref name) = p.method {
1267 plugin_schema.methods.iter()
1268 .find(|m| m.name == *name)
1269 .map(|m| super::SchemaResult::Method(m.clone()))
1270 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1271 -32602,
1272 format!("Method '{}' not found", name),
1273 None::<()>,
1274 ))?
1275 } else {
1276 super::SchemaResult::Plugin(plugin_schema)
1277 };
1278
1279 let stream = async_stream::stream! { yield result; };
1280 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1281 pipe_stream_to_subscription(pending, wrapped).await
1282 })
1283 }
1284 )?;
1285
1286 let info_payload = build_info_payload(
1292 self.runtime_namespace(),
1293 self.inner.auth_capabilities.as_ref(),
1294 );
1295 module.register_subscription(
1296 "_info",
1297 PLEXUS_NOTIF_METHOD,
1298 "_info_unsub",
1299 move |_params, pending, _ctx, _ext| {
1300 let payload = info_payload.clone();
1301 Box::pin(async move {
1302 let info_stream = futures::stream::once(async move { payload });
1304
1305 let wrapped = super::streaming::wrap_stream(
1307 info_stream,
1308 "_info",
1309 vec![]
1310 );
1311
1312 pipe_stream_to_subscription(pending, wrapped).await
1314 })
1315 }
1316 )?;
1317
1318 let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
1320 for factory in pending {
1321 module.merge(factory())?;
1322 }
1323
1324 for (ns, router) in self.inner.child_routers.iter() {
1328 register_child_capability_methods(&mut module, ns, router.clone())?;
1329 }
1330
1331 Ok(module)
1332 }
1333
1334 pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
1340 let mut module = RpcModule::new(());
1341
1342 PlexusContext::init(hub.compute_hash());
1343
1344 let ns = hub.runtime_namespace();
1347 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
1348 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
1349 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1350 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
1351 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1352 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
1353 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1354 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1355 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
1356
1357 let hub_for_call = hub.clone();
1359 module.register_subscription(
1360 call_method,
1361 call_method,
1362 call_unsub,
1363 move |params, pending, _ctx, ext| {
1364 let hub = hub_for_call.clone();
1365 Box::pin(async move {
1366 let p: CallParams = params.parse()?;
1367 let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
1369 .map(|arc| arc.as_ref());
1370 match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
1371 Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
1372 Err(e) => {
1373 let sink = pending.accept().await?;
1378 let error_item = super::types::PlexusStreamItem::Error {
1379 metadata: super::types::StreamMetadata::new(
1380 vec![ns_static.into()],
1381 PlexusContext::hash(),
1382 ),
1383 message: e.to_string(),
1384 code: Some(plexus_error_code(&e).to_string()),
1385 recoverable: false,
1386 };
1387 if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
1388 let _ = sink.send(raw).await;
1389 }
1390 Ok(())
1391 }
1392 }
1393 })
1394 }
1395 )?;
1396
1397 let hub_for_hash = hub.clone();
1399 module.register_subscription(
1400 hash_method,
1401 PLEXUS_NOTIF_METHOD,
1402 hash_unsub,
1403 move |_params, pending, _ctx, _ext| {
1404 let hub = hub_for_hash.clone();
1405 Box::pin(async move {
1406 let schema = Activation::plugin_schema(&*hub);
1407 let stream = async_stream::stream! {
1408 yield HashEvent::Hash { value: schema.hash };
1409 };
1410 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1411 pipe_stream_to_subscription(pending, wrapped).await
1412 })
1413 }
1414 )?;
1415
1416 let hub_for_schema = hub.clone();
1418 module.register_subscription(
1419 schema_method,
1420 PLEXUS_NOTIF_METHOD,
1421 schema_unsub,
1422 move |params, pending, _ctx, _ext| {
1423 let hub = hub_for_schema.clone();
1424 Box::pin(async move {
1425 let p: SchemaParams = params.parse().unwrap_or_default();
1426 let plugin_schema = Activation::plugin_schema(&*hub);
1427
1428 let result = if let Some(ref name) = p.method {
1429 plugin_schema.methods.iter()
1430 .find(|m| m.name == *name)
1431 .map(|m| super::SchemaResult::Method(m.clone()))
1432 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1433 -32602,
1434 format!("Method '{}' not found", name),
1435 None::<()>,
1436 ))?
1437 } else {
1438 super::SchemaResult::Plugin(plugin_schema)
1439 };
1440
1441 let stream = async_stream::stream! {
1442 yield result;
1443 };
1444 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1445 pipe_stream_to_subscription(pending, wrapped).await
1446 })
1447 }
1448 )?;
1449
1450 let info_payload = build_info_payload(
1455 hub.runtime_namespace(),
1456 hub.inner.auth_capabilities.as_ref(),
1457 );
1458 module.register_subscription(
1459 "_info",
1460 PLEXUS_NOTIF_METHOD,
1461 "_info_unsub",
1462 move |_params, pending, _ctx, _ext| {
1463 let payload = info_payload.clone();
1464 Box::pin(async move {
1465 let info_stream = futures::stream::once(async move { payload });
1467
1468 let wrapped = super::streaming::wrap_stream(
1470 info_stream,
1471 "_info",
1472 vec![]
1473 );
1474
1475 pipe_stream_to_subscription(pending, wrapped).await
1477 })
1478 }
1479 )?;
1480
1481 let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
1484 module.register_async_method(respond_method, |params, _ctx, _ext| async move {
1485 use super::bidirectional::{handle_pending_response, BidirError};
1486
1487 let p: RespondParams = params.parse()?;
1488
1489 tracing::debug!(
1490 request_id = %p.request_id,
1491 "Handling {}.respond via WebSocket",
1492 "plexus"
1493 );
1494
1495 match handle_pending_response(&p.request_id, p.response_data) {
1496 Ok(()) => Ok(serde_json::json!({"success": true})),
1497 Err(BidirError::UnknownRequest) => {
1498 tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1499 Err(jsonrpsee::types::ErrorObject::owned(
1500 -32602,
1501 format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1502 None::<()>,
1503 ))
1504 }
1505 Err(BidirError::ChannelClosed) => {
1506 tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1507 Err(jsonrpsee::types::ErrorObject::owned(
1508 -32000,
1509 "Response channel was closed (request may have timed out)",
1510 None::<()>,
1511 ))
1512 }
1513 Err(e) => {
1514 tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1515 Err(jsonrpsee::types::ErrorObject::owned(
1516 -32000,
1517 format!("Failed to deliver response: {}", e),
1518 None::<()>,
1519 ))
1520 }
1521 }
1522 })?;
1523
1524 let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1526 tracing::trace!(factories = pending.len(), "merging activation RPC factories");
1527 for (idx, factory) in pending.into_iter().enumerate() {
1528 tracing::trace!(factory_idx = idx, "calling factory to get Methods");
1529 let methods = factory();
1530 let method_count = methods.method_names().count();
1531 tracing::trace!(factory_idx = idx, methods = method_count, "factory returned Methods; merging into module");
1532 module.merge(methods)?;
1533 tracing::trace!(factory_idx = idx, "successfully merged factory methods");
1534 }
1535 tracing::trace!("all activations merged successfully");
1536
1537 for (ns, router) in hub.inner.child_routers.iter() {
1540 register_child_capability_methods(&mut module, ns, router.clone())?;
1541 }
1542
1543 Ok(module)
1544 }
1545}
1546
1547#[allow(deprecated)] fn register_child_capability_methods(
1562 module: &mut RpcModule<()>,
1563 namespace: &str,
1564 router: Arc<dyn ChildRouter>,
1565) -> Result<(), jsonrpsee::core::RegisterMethodError> {
1566 let caps = router.capabilities();
1567 if caps.is_empty() {
1568 return Ok(());
1569 }
1570
1571 let ns_static: &'static str = Box::leak(namespace.to_string().into_boxed_str());
1572
1573 if caps.contains(ChildCapabilities::LIST) {
1574 let list_method: &'static str =
1575 Box::leak(format!("{}.list_children", namespace).into_boxed_str());
1576 let list_unsub: &'static str =
1577 Box::leak(format!("{}.list_children_unsub", namespace).into_boxed_str());
1578 let router_for_list = router.clone();
1579 module.register_subscription(
1580 list_method,
1581 PLEXUS_NOTIF_METHOD,
1582 list_unsub,
1583 move |_params, pending, _ctx, _ext| {
1584 let router = router_for_list.clone();
1585 Box::pin(async move {
1586 let collected: Vec<String> = match router.list_children().await {
1593 Some(mut s) => {
1594 use futures::StreamExt;
1595 let mut acc = Vec::new();
1596 while let Some(name) = s.next().await {
1597 acc.push(name);
1598 }
1599 acc
1600 }
1601 None => Vec::new(),
1602 };
1603 let stream = async_stream::stream! {
1604 for name in collected {
1605 yield name;
1606 }
1607 };
1608 let wrapped = super::streaming::wrap_stream(
1609 stream,
1610 "list_children",
1611 vec![ns_static.into()],
1612 );
1613 pipe_stream_to_subscription(pending, wrapped).await
1614 })
1615 },
1616 )?;
1617 }
1618
1619 if caps.contains(ChildCapabilities::SEARCH) {
1620 let search_method: &'static str =
1621 Box::leak(format!("{}.search_children", namespace).into_boxed_str());
1622 let search_unsub: &'static str =
1623 Box::leak(format!("{}.search_children_unsub", namespace).into_boxed_str());
1624 let router_for_search = router.clone();
1625 module.register_subscription(
1626 search_method,
1627 PLEXUS_NOTIF_METHOD,
1628 search_unsub,
1629 move |params, pending, _ctx, _ext| {
1630 let router = router_for_search.clone();
1631 Box::pin(async move {
1632 let p: SearchChildrenParams = params.parse()?;
1633 let collected: Vec<String> = match router.search_children(&p.query).await {
1634 Some(mut s) => {
1635 use futures::StreamExt;
1636 let mut acc = Vec::new();
1637 while let Some(name) = s.next().await {
1638 acc.push(name);
1639 }
1640 acc
1641 }
1642 None => Vec::new(),
1643 };
1644 let stream = async_stream::stream! {
1645 for name in collected {
1646 yield name;
1647 }
1648 };
1649 let wrapped = super::streaming::wrap_stream(
1650 stream,
1651 "search_children",
1652 vec![ns_static.into()],
1653 );
1654 pipe_stream_to_subscription(pending, wrapped).await
1655 })
1656 },
1657 )?;
1658 }
1659
1660 Ok(())
1661}
1662
1663#[derive(Debug, serde::Deserialize)]
1665struct SearchChildrenParams {
1666 query: String,
1667}
1668
1669#[derive(Debug, serde::Deserialize)]
1671struct CallParams {
1672 method: String,
1673 #[serde(default)]
1674 params: Option<Value>,
1675}
1676
1677#[derive(Debug, Default, serde::Deserialize)]
1679struct SchemaParams {
1680 method: Option<String>,
1681}
1682
1683#[derive(Debug, serde::Deserialize)]
1685struct RespondParams {
1686 request_id: String,
1687 response_data: Value,
1688}
1689
1690async fn pipe_stream_to_subscription(
1695 pending: jsonrpsee::PendingSubscriptionSink,
1696 mut stream: PlexusStream,
1697) -> jsonrpsee::core::SubscriptionResult {
1698 use futures::StreamExt;
1699
1700 let sink = pending.accept().await?;
1701 while let Some(item) = stream.next().await {
1702 let json = serde_json::value::to_raw_value(&item)?;
1703 sink.send(json).await?;
1704 }
1705 Ok(())
1706}
1707
1708#[plexus_macros::activation(
1713 namespace = "plexus",
1714 version = "1.0.0",
1715 description = "Central routing and introspection",
1716 hub,
1717 namespace_fn = "runtime_namespace"
1718)]
1719#[allow(deprecated)]
1720impl DynamicHub {
1721 #[plexus_macros::method(
1723 streaming,
1724 description = "Route a call to a registered activation",
1725 params(
1726 method = "The method to call (format: namespace.method)",
1727 params = "Parameters to pass to the method (optional, defaults to {})"
1728 )
1729 )]
1730 async fn call(
1731 &self,
1732 method: String,
1733 params: Option<Value>,
1734 ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1735 use super::context::PlexusContext;
1736 use super::types::{PlexusStreamItem, StreamMetadata};
1737
1738 let result = self.route(&method, params.unwrap_or_default(), None).await;
1739
1740 match result {
1741 Ok(plexus_stream) => {
1742 plexus_stream
1744 }
1745 Err(e) => {
1746 let metadata = StreamMetadata::new(
1748 vec![self.inner.namespace.clone()],
1749 PlexusContext::hash(),
1750 );
1751 Box::pin(futures::stream::once(async move {
1752 PlexusStreamItem::Error {
1753 metadata,
1754 message: e.to_string(),
1755 code: None,
1756 recoverable: false,
1757 }
1758 }))
1759 }
1760 }
1761 }
1762
1763 #[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1768 async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1769 let schema = Activation::plugin_schema(self);
1770 stream! { yield HashEvent::Hash { value: schema.hash }; }
1771 }
1772
1773 #[plexus_macros::method(description = "Get plugin hashes for cache validation")]
1775 #[allow(deprecated)]
1776 async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1777 let schema = Activation::plugin_schema(self);
1778
1779 stream! {
1780 yield PluginHashes {
1781 namespace: schema.namespace.clone(),
1782 self_hash: schema.self_hash.clone(),
1783 children_hash: schema.children_hash.clone(),
1784 hash: schema.hash.clone(),
1785 children: schema.children.as_ref().map(|kids| {
1786 kids.iter()
1787 .map(|c| ChildHashes {
1788 namespace: c.namespace.clone(),
1789 hash: c.hash.clone(),
1790 })
1791 .collect()
1792 }),
1793 };
1794 }
1795 }
1796
1797 }
1799
1800use super::hub_context::HubContext;
1805use std::sync::Weak;
1806
1807#[async_trait]
1813impl HubContext for Weak<DynamicHub> {
1814 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1815 let hub = self.upgrade().ok_or_else(|| {
1816 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1817 })?;
1818 hub.do_resolve_handle(handle).await
1819 }
1820
1821 async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1822 let hub = self.upgrade().ok_or_else(|| {
1823 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1824 })?;
1825 hub.route(method, params, None).await
1826 }
1827
1828 fn is_valid(&self) -> bool {
1829 self.upgrade().is_some()
1830 }
1831}
1832
1833#[async_trait]
1838impl ChildRouter for DynamicHub {
1839 fn router_namespace(&self) -> &str {
1840 &self.inner.namespace
1841 }
1842
1843 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1844 self.route_with_ctx(method, params, auth, raw_ctx).await
1847 }
1848
1849 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1850 self.inner.child_routers.get(name)
1852 .map(|router| {
1853 Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1855 })
1856 }
1857
1858 fn forward_policy_for(
1861 &self,
1862 callee_ns: &str,
1863 ) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
1864 self.inner.forward_policies.get(callee_ns)
1865 }
1866
1867 }
1874
1875#[cfg(test)]
1876#[allow(deprecated)]
1877mod tests {
1878 use super::*;
1879
1880 #[test]
1881 fn dynamic_hub_implements_activation() {
1882 fn assert_activation<T: Activation>() {}
1883 assert_activation::<DynamicHub>();
1884 }
1885
1886 #[test]
1887 fn dynamic_hub_methods() {
1888 let hub = DynamicHub::new("test");
1889 let methods = hub.methods();
1890 assert!(methods.contains(&"call"));
1891 assert!(methods.contains(&"hash"));
1892 assert!(methods.contains(&"schema"));
1893 }
1895
1896 #[test]
1897 fn dynamic_hub_hash_stable() {
1898 let h1 = DynamicHub::new("test");
1899 let h2 = DynamicHub::new("test");
1900 assert_eq!(h1.compute_hash(), h2.compute_hash());
1901 }
1902
1903 #[test]
1904 fn dynamic_hub_is_hub() {
1905 use crate::activations::health::Health;
1906 let hub = DynamicHub::new("test").register(Health::new());
1907 let schema = hub.plugin_schema();
1908
1909 assert!(schema.is_hub(), "dynamic hub should be a hub");
1911 assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1912
1913 let children = schema.children.expect("dynamic hub should have children");
1915 assert!(!children.is_empty(), "dynamic hub should have at least one child");
1916
1917 let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1919 assert!(!health.hash.is_empty(), "health should have a hash");
1920 }
1921
1922 #[test]
1923 fn dynamic_hub_schema_structure() {
1924 use crate::activations::health::Health;
1925 let hub = DynamicHub::new("test").register(Health::new());
1926 let schema = hub.plugin_schema();
1927
1928 let json = serde_json::to_string_pretty(&schema).unwrap();
1930 println!("DynamicHub schema:\n{}", json);
1931
1932 assert_eq!(schema.namespace, "test");
1934 assert!(schema.methods.iter().any(|m| m.name == "call"));
1935 assert!(schema.children.is_some());
1936 }
1937
1938 #[tokio::test]
1943 async fn invariant_resolve_handle_unknown_activation() {
1944 use crate::activations::health::Health;
1945 use crate::types::Handle;
1946 use uuid::Uuid;
1947
1948 let hub = DynamicHub::new("test").register(Health::new());
1949
1950 let unknown_plugin_id = Uuid::new_v4();
1952 let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1953
1954 let result = hub.do_resolve_handle(&handle).await;
1955
1956 match result {
1957 Err(PlexusError::ActivationNotFound(_)) => {
1958 }
1960 Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1961 Ok(_) => panic!("Expected error for unknown activation"),
1962 }
1963 }
1964
1965 #[tokio::test]
1966 async fn invariant_resolve_handle_unsupported() {
1967 use crate::activations::health::Health;
1968 use crate::types::Handle;
1969
1970 let hub = DynamicHub::new("test").register(Health::new());
1971
1972 let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1974
1975 let result = hub.do_resolve_handle(&handle).await;
1976
1977 match result {
1978 Err(PlexusError::HandleNotSupported(name)) => {
1979 assert_eq!(name, "health");
1980 }
1981 Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1982 Ok(_) => panic!("Expected error for unsupported handle"),
1983 }
1984 }
1985
1986 #[tokio::test]
1987 async fn invariant_resolve_handle_routes_by_plugin_id() {
1988 use crate::activations::health::Health;
1989 use crate::activations::echo::Echo;
1990 use crate::types::Handle;
1991 use uuid::Uuid;
1992
1993 let health = Health::new();
1994 let echo = Echo::new();
1995 let health_plugin_id = health.plugin_id();
1996 let echo_plugin_id = echo.plugin_id();
1997
1998 let hub = DynamicHub::new("test")
1999 .register(health)
2000 .register(echo);
2001
2002 let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
2004 match hub.do_resolve_handle(&health_handle).await {
2005 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
2006 Err(other) => panic!("health handle should route to health activation, got {:?}", other),
2007 Ok(_) => panic!("health handle should return HandleNotSupported"),
2008 }
2009
2010 let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
2012 match hub.do_resolve_handle(&echo_handle).await {
2013 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
2014 Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
2015 Ok(_) => panic!("echo handle should return HandleNotSupported"),
2016 }
2017
2018 let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
2020 match hub.do_resolve_handle(&unknown_handle).await {
2021 Err(PlexusError::ActivationNotFound(_)) => { },
2022 Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
2023 Ok(_) => panic!("unknown handle should return ActivationNotFound"),
2024 }
2025 }
2026
2027 #[test]
2028 fn invariant_handle_plugin_id_determines_routing() {
2029 use crate::activations::health::Health;
2030 use crate::activations::echo::Echo;
2031 use crate::types::Handle;
2032
2033 let health = Health::new();
2034 let echo = Echo::new();
2035
2036 let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
2038 .with_meta(vec!["msg-123".into(), "user".into()]);
2039 let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
2040 .with_meta(vec!["msg-123".into(), "user".into()]);
2041
2042 assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
2044 }
2045
2046 #[test]
2051 fn plugin_registry_basic_operations() {
2052 let mut registry = PluginRegistry::new();
2053 let id = uuid::Uuid::new_v4();
2054
2055 registry.register(id, "test_plugin".to_string(), "test".to_string());
2057
2058 assert_eq!(registry.lookup(id), Some("test_plugin"));
2060
2061 assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
2063
2064 let entry = registry.get(id).expect("should have entry");
2066 assert_eq!(entry.path, "test_plugin");
2067 assert_eq!(entry.plugin_type, "test");
2068 }
2069
2070 #[test]
2071 fn plugin_registry_populated_on_register() {
2072 use crate::activations::health::Health;
2073
2074 let hub = DynamicHub::new("test").register(Health::new());
2075
2076 let registry = hub.registry();
2077 assert!(!registry.is_empty(), "registry should not be empty after registration");
2078
2079 let health_id = registry.lookup_by_path("health");
2081 assert!(health_id.is_some(), "health should be registered by path");
2082
2083 let health_uuid = health_id.unwrap();
2085 assert_eq!(registry.lookup(health_uuid), Some("health"));
2086 }
2087
2088 #[test]
2089 fn plugin_registry_deterministic_uuid() {
2090 use crate::activations::health::Health;
2091
2092 let health1 = Health::new();
2094 let health2 = Health::new();
2095
2096 assert_eq!(health1.plugin_id(), health2.plugin_id(),
2097 "same activation type should have deterministic UUID");
2098
2099 let expected = uuid::Uuid::new_v5(
2101 &uuid::Uuid::NAMESPACE_OID,
2102 b"health@1"
2103 );
2104 assert_eq!(health1.plugin_id(), expected,
2105 "plugin_id should be deterministic from namespace@major_version");
2106 }
2107
2108 struct MinimalRouter;
2116
2117 #[async_trait]
2118 impl ChildRouter for MinimalRouter {
2119 fn router_namespace(&self) -> &str {
2120 "minimal"
2121 }
2122
2123 async fn router_call(
2124 &self,
2125 _method: &str,
2126 _params: Value,
2127 _auth: Option<&super::super::auth::AuthContext>,
2128 _raw_ctx: Option<&crate::request::RawRequestContext>,
2129 ) -> Result<PlexusStream, PlexusError> {
2130 Err(PlexusError::MethodNotFound {
2131 activation: "minimal".into(),
2132 method: "none".into(),
2133 })
2134 }
2135
2136 async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
2137 None
2138 }
2139 }
2140
2141 #[tokio::test]
2142 async fn child_router_defaults_report_no_capabilities_and_none_streams() {
2143 let router = MinimalRouter;
2144
2145 assert_eq!(
2146 router.capabilities(),
2147 ChildCapabilities::empty(),
2148 "default capabilities should be empty"
2149 );
2150 assert!(
2151 router.list_children().await.is_none(),
2152 "default list_children should be None"
2153 );
2154 assert!(
2155 router.search_children("anything").await.is_none(),
2156 "default search_children should be None"
2157 );
2158 }
2159
2160 struct ListingRouter {
2162 names: Vec<String>,
2163 }
2164
2165 #[async_trait]
2166 impl ChildRouter for ListingRouter {
2167 fn router_namespace(&self) -> &str {
2168 "listing"
2169 }
2170
2171 async fn router_call(
2172 &self,
2173 _method: &str,
2174 _params: Value,
2175 _auth: Option<&super::super::auth::AuthContext>,
2176 _raw_ctx: Option<&crate::request::RawRequestContext>,
2177 ) -> Result<PlexusStream, PlexusError> {
2178 Err(PlexusError::MethodNotFound {
2179 activation: "listing".into(),
2180 method: "none".into(),
2181 })
2182 }
2183
2184 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
2185 if self.names.iter().any(|n| n == name) {
2186 Some(Box::new(ListingRouter { names: vec![] }))
2189 } else {
2190 None
2191 }
2192 }
2193
2194 fn capabilities(&self) -> ChildCapabilities {
2195 ChildCapabilities::LIST | ChildCapabilities::SEARCH
2196 }
2197
2198 async fn list_children(&self) -> Option<BoxStream<'_, String>> {
2199 let stream = futures::stream::iter(self.names.iter().cloned());
2200 Some(Box::pin(stream))
2201 }
2202
2203 async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
2204 let q = query.to_string();
2205 let stream = futures::stream::iter(
2206 self.names
2207 .iter()
2208 .filter(move |n| n.contains(&q))
2209 .cloned()
2210 .collect::<Vec<_>>(),
2211 );
2212 Some(Box::pin(stream))
2213 }
2214 }
2215
2216 #[tokio::test]
2217 async fn child_router_overrides_report_capabilities_and_yield_streams() {
2218 use futures::StreamExt;
2219
2220 let router = ListingRouter {
2221 names: vec!["alpha".into(), "beta".into(), "alphabet".into()],
2222 };
2223
2224 let caps = router.capabilities();
2226 assert!(caps.contains(ChildCapabilities::LIST));
2227 assert!(caps.contains(ChildCapabilities::SEARCH));
2228 assert_eq!(caps, ChildCapabilities::LIST | ChildCapabilities::SEARCH);
2229
2230 let list_stream = router
2232 .list_children()
2233 .await
2234 .expect("LIST capability set — expected Some(stream)");
2235 let listed: Vec<String> = list_stream.collect().await;
2236 assert_eq!(listed, vec!["alpha".to_string(), "beta".into(), "alphabet".into()]);
2237
2238 let search_stream = router
2240 .search_children("alpha")
2241 .await
2242 .expect("SEARCH capability set — expected Some(stream)");
2243 let matched: Vec<String> = search_stream.collect().await;
2244 assert_eq!(matched, vec!["alpha".to_string(), "alphabet".into()]);
2245 }
2246
2247 struct WireFixture {
2262 names: Vec<String>,
2263 caps: ChildCapabilities,
2264 }
2265
2266 #[async_trait]
2267 impl ChildRouter for WireFixture {
2268 fn router_namespace(&self) -> &str {
2269 "wirefixture"
2270 }
2271 async fn router_call(
2272 &self,
2273 _method: &str,
2274 _params: Value,
2275 _auth: Option<&super::super::auth::AuthContext>,
2276 _raw_ctx: Option<&crate::request::RawRequestContext>,
2277 ) -> Result<PlexusStream, PlexusError> {
2278 Err(PlexusError::MethodNotFound {
2279 activation: "wirefixture".into(),
2280 method: "none".into(),
2281 })
2282 }
2283 async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
2284 None
2285 }
2286 fn capabilities(&self) -> ChildCapabilities {
2287 self.caps
2288 }
2289 async fn list_children(&self) -> Option<futures_core::stream::BoxStream<'_, String>> {
2290 if !self.caps.contains(ChildCapabilities::LIST) {
2291 return None;
2292 }
2293 Some(Box::pin(futures::stream::iter(self.names.clone())))
2294 }
2295 async fn search_children(
2296 &self,
2297 query: &str,
2298 ) -> Option<futures_core::stream::BoxStream<'_, String>> {
2299 if !self.caps.contains(ChildCapabilities::SEARCH) {
2300 return None;
2301 }
2302 let q = query.to_lowercase();
2303 let filtered: Vec<String> = self
2304 .names
2305 .iter()
2306 .filter(|n| n.to_lowercase().contains(&q))
2307 .cloned()
2308 .collect();
2309 Some(Box::pin(futures::stream::iter(filtered)))
2310 }
2311 }
2312
2313 fn build_module_for(router: WireFixture, ns: &str) -> RpcModule<()> {
2314 let mut module = RpcModule::new(());
2315 let arc: Arc<dyn ChildRouter> = Arc::new(router);
2316 register_child_capability_methods(&mut module, ns, arc).expect("register");
2317 module
2318 }
2319
2320 #[tokio::test]
2321 async fn child_wire_registers_both_methods_when_both_bits_set() {
2322 let module = build_module_for(
2323 WireFixture {
2324 names: vec!["alpha".into(), "beta".into()],
2325 caps: ChildCapabilities::LIST | ChildCapabilities::SEARCH,
2326 },
2327 "fixture",
2328 );
2329 let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
2330 assert!(
2331 names.contains(&"fixture.list_children".to_string()),
2332 "expected fixture.list_children, got: {:?}",
2333 names
2334 );
2335 assert!(
2336 names.contains(&"fixture.search_children".to_string()),
2337 "expected fixture.search_children, got: {:?}",
2338 names
2339 );
2340 }
2341
2342 #[tokio::test]
2343 async fn child_wire_registers_nothing_when_no_bits_set() {
2344 let module = build_module_for(
2345 WireFixture {
2346 names: vec!["alpha".into()],
2347 caps: ChildCapabilities::empty(),
2348 },
2349 "fixture",
2350 );
2351 let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
2352 assert!(
2353 !names.contains(&"fixture.list_children".to_string()),
2354 "fixture.list_children should NOT be registered when cap absent"
2355 );
2356 assert!(
2357 !names.contains(&"fixture.search_children".to_string()),
2358 "fixture.search_children should NOT be registered when cap absent"
2359 );
2360 }
2361
2362 #[tokio::test]
2363 async fn child_wire_registers_only_list_when_only_list_bit() {
2364 let module = build_module_for(
2365 WireFixture {
2366 names: vec!["alpha".into()],
2367 caps: ChildCapabilities::LIST,
2368 },
2369 "fixture",
2370 );
2371 let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
2372 assert!(names.contains(&"fixture.list_children".to_string()));
2373 assert!(!names.contains(&"fixture.search_children".to_string()));
2374 }
2375
2376 }