1use super::{
9 context::PlexusContext,
10 method_enum::MethodEnumSchema,
11 schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12 streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use futures::Stream;
18use jsonrpsee::core::server::Methods;
19use jsonrpsee::SubscriptionMessage;
20use jsonrpsee::RpcModule;
21use schemars::JsonSchema;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use std::collections::HashMap;
25use std::sync::Arc;
26
27#[derive(Debug, Clone)]
32pub enum PlexusError {
33 ActivationNotFound(String),
34 MethodNotFound { activation: String, method: String },
35 InvalidParams(String),
36 ExecutionError(String),
37 HandleNotSupported(String),
38}
39
40impl std::fmt::Display for PlexusError {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
44 PlexusError::MethodNotFound { activation, method } => {
45 write!(f, "Method not found: {}.{}", activation, method)
46 }
47 PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
48 PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
49 PlexusError::HandleNotSupported(activation) => {
50 write!(f, "Handle resolution not supported by activation: {}", activation)
51 }
52 }
53 }
54}
55
56impl std::error::Error for PlexusError {}
57
58#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
63pub struct ActivationInfo {
64 pub namespace: String,
65 pub version: String,
66 pub description: String,
67 pub methods: Vec<String>,
68}
69
70#[async_trait]
75pub trait Activation: Send + Sync + 'static {
76 type Methods: MethodEnumSchema;
77
78 fn namespace(&self) -> &str;
79 fn version(&self) -> &str;
80 fn description(&self) -> &str { "No description available" }
82 fn long_description(&self) -> Option<&str> { None }
84 fn methods(&self) -> Vec<&str>;
85 fn method_help(&self, _method: &str) -> Option<String> { None }
86 fn plugin_id(&self) -> uuid::Uuid {
90 let major_version = self.version().split('.').next().unwrap_or("0");
91 uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
92 }
93
94 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
95 async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
96 Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
97 }
98
99 fn into_rpc_methods(self) -> Methods where Self: Sized;
100
101 fn plugin_schema(&self) -> PluginSchema {
103 use std::collections::hash_map::DefaultHasher;
104 use std::hash::{Hash, Hasher};
105
106 let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
107 let desc = self.method_help(name).unwrap_or_default();
108 let mut hasher = DefaultHasher::new();
110 name.hash(&mut hasher);
111 desc.hash(&mut hasher);
112 let hash = format!("{:016x}", hasher.finish());
113 MethodSchema::new(name.to_string(), desc, hash)
114 }).collect();
115
116 if let Some(long_desc) = self.long_description() {
117 PluginSchema::leaf_with_long_description(
118 self.namespace(),
119 self.version(),
120 self.description(),
121 long_desc,
122 methods,
123 )
124 } else {
125 PluginSchema::leaf(
126 self.namespace(),
127 self.version(),
128 self.description(),
129 methods,
130 )
131 }
132 }
133}
134
135#[async_trait]
148pub trait ChildRouter: Send + Sync {
149 fn router_namespace(&self) -> &str;
151
152 async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
154
155 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
157}
158
159pub async fn route_to_child<T: ChildRouter + ?Sized>(
165 parent: &T,
166 method: &str,
167 params: Value,
168) -> Result<PlexusStream, PlexusError> {
169 if let Some((child_name, rest)) = method.split_once('.') {
171 if let Some(child) = parent.get_child(child_name).await {
172 return child.router_call(rest, params).await;
173 }
174 return Err(PlexusError::ActivationNotFound(child_name.to_string()));
175 }
176
177 Err(PlexusError::MethodNotFound {
179 activation: parent.router_namespace().to_string(),
180 method: method.to_string(),
181 })
182}
183
184struct ArcChildRouter(Arc<dyn ChildRouter>);
188
189#[async_trait]
190impl ChildRouter for ArcChildRouter {
191 fn router_namespace(&self) -> &str {
192 self.0.router_namespace()
193 }
194
195 async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
196 self.0.router_call(method, params).await
197 }
198
199 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
200 self.0.get_child(name).await
201 }
202}
203
204#[async_trait]
209#[allow(dead_code)] trait ActivationObject: Send + Sync + 'static {
211 fn namespace(&self) -> &str;
212 fn version(&self) -> &str;
213 fn description(&self) -> &str;
214 fn long_description(&self) -> Option<&str>;
215 fn methods(&self) -> Vec<&str>;
216 fn method_help(&self, method: &str) -> Option<String>;
217 fn plugin_id(&self) -> uuid::Uuid;
218 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
219 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
220 fn plugin_schema(&self) -> PluginSchema;
221 fn schema(&self) -> Schema;
222}
223
224struct ActivationWrapper<A: Activation> {
225 inner: A,
226}
227
228#[async_trait]
229impl<A: Activation> ActivationObject for ActivationWrapper<A> {
230 fn namespace(&self) -> &str { self.inner.namespace() }
231 fn version(&self) -> &str { self.inner.version() }
232 fn description(&self) -> &str { self.inner.description() }
233 fn long_description(&self) -> Option<&str> { self.inner.long_description() }
234 fn methods(&self) -> Vec<&str> { self.inner.methods() }
235 fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
236 fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
237
238 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
239 self.inner.call(method, params).await
240 }
241
242 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
243 self.inner.resolve_handle(handle).await
244 }
245
246 fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
247
248 fn schema(&self) -> Schema {
249 let schema = schemars::schema_for!(A::Methods);
250 serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
251 .expect("parse schema")
252 }
253}
254
255#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
260#[serde(tag = "event", rename_all = "snake_case")]
261pub enum HashEvent {
262 Hash { value: String },
263}
264
265#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
267#[serde(tag = "event", rename_all = "snake_case")]
268pub enum SchemaEvent {
269 Schema(PluginSchema),
271}
272
273
274#[derive(Debug, Clone)]
280pub struct PluginEntry {
281 pub id: uuid::Uuid,
283 pub path: String,
285 pub plugin_type: String,
287}
288
289#[derive(Default)]
294pub struct PluginRegistry {
295 by_id: HashMap<uuid::Uuid, PluginEntry>,
297 by_path: HashMap<String, uuid::Uuid>,
299}
300
301#[derive(Clone)]
305pub struct PluginRegistrySnapshot {
306 by_id: HashMap<uuid::Uuid, PluginEntry>,
307 by_path: HashMap<String, uuid::Uuid>,
308}
309
310impl PluginRegistrySnapshot {
311 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
313 self.by_id.get(&id).map(|e| e.path.as_str())
314 }
315
316 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
318 self.by_path.get(path).copied()
319 }
320
321 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
323 self.by_id.get(&id)
324 }
325
326 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
328 self.by_id.values()
329 }
330
331 pub fn len(&self) -> usize {
333 self.by_id.len()
334 }
335
336 pub fn is_empty(&self) -> bool {
338 self.by_id.is_empty()
339 }
340}
341
342impl PluginRegistry {
343 pub fn new() -> Self {
345 Self::default()
346 }
347
348 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
350 self.by_id.get(&id).map(|e| e.path.as_str())
351 }
352
353 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
355 self.by_path.get(path).copied()
356 }
357
358 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
360 self.by_id.get(&id)
361 }
362
363 pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
365 let entry = PluginEntry { id, path: path.clone(), plugin_type };
366 self.by_id.insert(id, entry);
367 self.by_path.insert(path, id);
368 }
369
370 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
372 self.by_id.values()
373 }
374
375 pub fn len(&self) -> usize {
377 self.by_id.len()
378 }
379
380 pub fn is_empty(&self) -> bool {
382 self.by_id.is_empty()
383 }
384}
385
386struct DynamicHubInner {
391 namespace: String,
393 activations: HashMap<String, Arc<dyn ActivationObject>>,
394 child_routers: HashMap<String, Arc<dyn ChildRouter>>,
396 registry: std::sync::RwLock<PluginRegistry>,
398 pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
399}
400
401#[derive(Clone)]
423pub struct DynamicHub {
424 inner: Arc<DynamicHubInner>,
425}
426
427impl DynamicHub {
432 pub fn new(namespace: impl Into<String>) -> Self {
442 Self {
443 inner: Arc::new(DynamicHubInner {
444 namespace: namespace.into(),
445 activations: HashMap::new(),
446 child_routers: HashMap::new(),
447 registry: std::sync::RwLock::new(PluginRegistry::new()),
448 pending_rpc: std::sync::Mutex::new(Vec::new()),
449 }),
450 }
451 }
452
453 #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
455 pub fn with_namespace(namespace: impl Into<String>) -> Self {
456 Self::new(namespace)
457 }
458
459 pub fn runtime_namespace(&self) -> &str {
461 &self.inner.namespace
462 }
463
464 pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
466 self.inner.registry.read().unwrap()
467 }
468
469 pub fn register<A: Activation + Clone>(mut self, activation: A) -> Self {
471 let namespace = activation.namespace().to_string();
472 let plugin_id = activation.plugin_id();
473 let activation_for_rpc = activation.clone();
474
475 let inner = Arc::get_mut(&mut self.inner)
476 .expect("Cannot register: DynamicHub has multiple references");
477
478 inner.registry.write().unwrap().register(
480 plugin_id,
481 namespace.clone(),
482 namespace.clone(), );
484
485 inner.activations.insert(namespace, Arc::new(ActivationWrapper { inner: activation }));
486 inner.pending_rpc.lock().unwrap()
487 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
488 self
489 }
490
491 pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
496 let namespace = activation.namespace().to_string();
497 let plugin_id = activation.plugin_id();
498 let activation_for_rpc = activation.clone();
499 let activation_for_router = activation.clone();
500
501 let inner = Arc::get_mut(&mut self.inner)
502 .expect("Cannot register: DynamicHub has multiple references");
503
504 inner.registry.write().unwrap().register(
506 plugin_id,
507 namespace.clone(),
508 namespace.clone(), );
510
511 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
512 inner.child_routers.insert(namespace, Arc::new(activation_for_router));
513 inner.pending_rpc.lock().unwrap()
514 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
515 self
516 }
517
518 pub fn list_methods(&self) -> Vec<String> {
520 let mut methods = Vec::new();
521
522 for m in Activation::methods(self) {
524 methods.push(format!("{}.{}", self.inner.namespace, m));
525 }
526
527 for (ns, act) in &self.inner.activations {
529 for m in act.methods() {
530 methods.push(format!("{}.{}", ns, m));
531 }
532 }
533 methods.sort();
534 methods
535 }
536
537 pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
539 let mut activations = Vec::new();
540
541 activations.push(ActivationInfo {
543 namespace: Activation::namespace(self).to_string(),
544 version: Activation::version(self).to_string(),
545 description: Activation::description(self).to_string(),
546 methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
547 });
548
549 for a in self.inner.activations.values() {
551 activations.push(ActivationInfo {
552 namespace: a.namespace().to_string(),
553 version: a.version().to_string(),
554 description: a.description().to_string(),
555 methods: a.methods().iter().map(|s| s.to_string()).collect(),
556 });
557 }
558
559 activations
560 }
561
562 pub fn compute_hash(&self) -> String {
567 Activation::plugin_schema(self).hash
568 }
569
570 pub async fn route(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
572 let (namespace, method_name) = self.parse_method(method)?;
573
574 if namespace == self.inner.namespace {
576 return Activation::call(self, method_name, params).await;
577 }
578
579 let activation = self.inner.activations.get(namespace)
580 .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
581
582 activation.call(method_name, params).await
583 }
584
585 pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
589 let path = self.inner.registry.read().unwrap()
590 .lookup(handle.plugin_id)
591 .map(|s| s.to_string())
592 .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
593
594 let activation = self.inner.activations.get(&path)
595 .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
596 activation.resolve_handle(handle).await
597 }
598
599 pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
601 self.inner.activations.get(namespace).map(|a| a.schema())
602 }
603
604 pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
606 let guard = self.inner.registry.read().unwrap();
607 PluginRegistrySnapshot {
608 by_id: guard.by_id.clone(),
609 by_path: guard.by_path.clone(),
610 }
611 }
612
613 pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
615 self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
616 }
617
618 pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
620 self.inner.registry.read().unwrap().lookup_by_path(path)
621 }
622
623 pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
625 let mut schemas = Vec::new();
626
627 schemas.push(Activation::plugin_schema(self));
629
630 for a in self.inner.activations.values() {
632 schemas.push(a.plugin_schema());
633 }
634
635 schemas
636 }
637
638 #[deprecated(note = "Use list_plugin_schemas instead")]
640 pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
641 self.list_plugin_schemas()
642 }
643
644 pub fn get_method_help(&self, method: &str) -> Option<String> {
646 let (namespace, method_name) = self.parse_method(method).ok()?;
647 let activation = self.inner.activations.get(namespace)?;
648 activation.method_help(method_name)
649 }
650
651 fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
652 let parts: Vec<&str> = method.splitn(2, '.').collect();
653 if parts.len() != 2 {
654 return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
655 }
656 Ok((parts[0], parts[1]))
657 }
658
659 pub fn plugin_children(&self) -> Vec<ChildSummary> {
662 self.inner.activations.values()
663 .map(|a| {
664 let schema = a.plugin_schema();
665 ChildSummary {
666 namespace: schema.namespace,
667 description: schema.description,
668 hash: schema.hash,
669 }
670 })
671 .collect()
672 }
673
674 pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
676 let mut module = RpcModule::new(());
677
678 PlexusContext::init(self.compute_hash());
679
680 let ns = self.runtime_namespace();
683 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
684 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
685 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
686 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
687 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
688 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
689 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
690 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
691 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
692
693 let plexus_for_call = self.clone();
695 module.register_subscription(
696 call_method,
697 call_method,
698 call_unsub,
699 move |params, pending, _ctx, _ext| {
700 let plexus = plexus_for_call.clone();
701 Box::pin(async move {
702 let p: CallParams = params.parse()?;
704 let stream = plexus.route(&p.method, p.params.unwrap_or_default()).await
705 .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
706 pipe_stream_to_subscription(pending, stream).await
707 })
708 }
709 )?;
710
711 let plexus_for_hash = self.clone();
713 module.register_subscription(
714 hash_method,
715 hash_method,
716 hash_unsub,
717 move |_params, pending, _ctx, _ext| {
718 let plexus = plexus_for_hash.clone();
719 Box::pin(async move {
720 let schema = Activation::plugin_schema(&plexus);
721 let stream = async_stream::stream! {
722 yield HashEvent::Hash { value: schema.hash };
723 };
724 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
725 pipe_stream_to_subscription(pending, wrapped).await
726 })
727 }
728 )?;
729
730 let plexus_for_schema = self.clone();
732 module.register_subscription(
733 schema_method,
734 schema_method,
735 schema_unsub,
736 move |params, pending, _ctx, _ext| {
737 let plexus = plexus_for_schema.clone();
738 Box::pin(async move {
739 let p: SchemaParams = params.parse().unwrap_or_default();
740 let plugin_schema = Activation::plugin_schema(&plexus);
741
742 let result = if let Some(ref name) = p.method {
743 plugin_schema.methods.iter()
744 .find(|m| m.name == *name)
745 .map(|m| super::SchemaResult::Method(m.clone()))
746 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
747 -32602,
748 format!("Method '{}' not found", name),
749 None::<()>,
750 ))?
751 } else {
752 super::SchemaResult::Plugin(plugin_schema)
753 };
754
755 let stream = async_stream::stream! { yield result; };
756 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
757 pipe_stream_to_subscription(pending, wrapped).await
758 })
759 }
760 )?;
761
762 let backend_name = self.runtime_namespace().to_string();
765 module.register_subscription(
766 "_info",
767 "_info",
768 "_info_unsub",
769 move |_params, pending, _ctx, _ext| {
770 let name = backend_name.clone();
771 Box::pin(async move {
772 let info_stream = futures::stream::once(async move {
774 serde_json::json!({"backend": name})
775 });
776
777 let wrapped = super::streaming::wrap_stream(
779 info_stream,
780 "_info",
781 vec![]
782 );
783
784 pipe_stream_to_subscription(pending, wrapped).await
786 })
787 }
788 )?;
789
790 let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
792 for factory in pending {
793 module.merge(factory())?;
794 }
795
796 Ok(module)
797 }
798
799 pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
805 let mut module = RpcModule::new(());
806
807 PlexusContext::init(hub.compute_hash());
808
809 let ns = hub.runtime_namespace();
812 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
813 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
814 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
815 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
816 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
817 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
818 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
819 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
820 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
821
822 let hub_for_call = hub.clone();
824 module.register_subscription(
825 call_method,
826 call_method,
827 call_unsub,
828 move |params, pending, _ctx, _ext| {
829 let hub = hub_for_call.clone();
830 Box::pin(async move {
831 let p: CallParams = params.parse()?;
832 let stream = hub.route(&p.method, p.params.unwrap_or_default()).await
833 .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
834 pipe_stream_to_subscription(pending, stream).await
835 })
836 }
837 )?;
838
839 let hub_for_hash = hub.clone();
841 module.register_subscription(
842 hash_method,
843 hash_method,
844 hash_unsub,
845 move |_params, pending, _ctx, _ext| {
846 let hub = hub_for_hash.clone();
847 Box::pin(async move {
848 let schema = Activation::plugin_schema(&*hub);
849 let stream = async_stream::stream! {
850 yield HashEvent::Hash { value: schema.hash };
851 };
852 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
853 pipe_stream_to_subscription(pending, wrapped).await
854 })
855 }
856 )?;
857
858 let hub_for_schema = hub.clone();
860 module.register_subscription(
861 schema_method,
862 schema_method,
863 schema_unsub,
864 move |params, pending, _ctx, _ext| {
865 let hub = hub_for_schema.clone();
866 Box::pin(async move {
867 let p: SchemaParams = params.parse().unwrap_or_default();
868 let plugin_schema = Activation::plugin_schema(&*hub);
869
870 let result = if let Some(ref name) = p.method {
871 plugin_schema.methods.iter()
872 .find(|m| m.name == *name)
873 .map(|m| super::SchemaResult::Method(m.clone()))
874 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
875 -32602,
876 format!("Method '{}' not found", name),
877 None::<()>,
878 ))?
879 } else {
880 super::SchemaResult::Plugin(plugin_schema)
881 };
882
883 let stream = async_stream::stream! {
884 yield result;
885 };
886 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
887 pipe_stream_to_subscription(pending, wrapped).await
888 })
889 }
890 )?;
891
892 let backend_name = hub.runtime_namespace().to_string();
895 module.register_subscription(
896 "_info",
897 "_info",
898 "_info_unsub",
899 move |_params, pending, _ctx, _ext| {
900 let name = backend_name.clone();
901 Box::pin(async move {
902 let info_stream = futures::stream::once(async move {
904 serde_json::json!({"backend": name})
905 });
906
907 let wrapped = super::streaming::wrap_stream(
909 info_stream,
910 "_info",
911 vec![]
912 );
913
914 pipe_stream_to_subscription(pending, wrapped).await
916 })
917 }
918 )?;
919
920 let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
922 for factory in pending {
923 module.merge(factory())?;
924 }
925
926 Ok(module)
927 }
928}
929
930#[derive(Debug, serde::Deserialize)]
932struct CallParams {
933 method: String,
934 #[serde(default)]
935 params: Option<Value>,
936}
937
938#[derive(Debug, Default, serde::Deserialize)]
940struct SchemaParams {
941 method: Option<String>,
942}
943
944async fn pipe_stream_to_subscription(
946 pending: jsonrpsee::PendingSubscriptionSink,
947 mut stream: PlexusStream,
948) -> jsonrpsee::core::SubscriptionResult {
949 use futures::StreamExt;
950 use jsonrpsee::SubscriptionMessage;
951
952 let sink = pending.accept().await?;
953 while let Some(item) = stream.next().await {
954 let msg = SubscriptionMessage::new("result", sink.subscription_id(), &item)?;
955 sink.send(msg).await?;
956 }
957 Ok(())
958}
959
960#[plexus_macros::hub_methods(
965 namespace = "plexus",
966 version = "1.0.0",
967 description = "Central routing and introspection",
968 hub,
969 namespace_fn = "runtime_namespace"
970)]
971impl DynamicHub {
972 #[plexus_macros::hub_method(
974 streaming,
975 description = "Route a call to a registered activation",
976 params(
977 method = "The method to call (format: namespace.method)",
978 params = "Parameters to pass to the method (optional, defaults to {})"
979 )
980 )]
981 async fn call(
982 &self,
983 method: String,
984 params: Option<Value>,
985 ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
986 use super::context::PlexusContext;
987 use super::types::{PlexusStreamItem, StreamMetadata};
988
989 let result = self.route(&method, params.unwrap_or_default()).await;
990
991 match result {
992 Ok(plexus_stream) => {
993 plexus_stream
995 }
996 Err(e) => {
997 let metadata = StreamMetadata::new(
999 vec![self.inner.namespace.clone()],
1000 PlexusContext::hash(),
1001 );
1002 Box::pin(futures::stream::once(async move {
1003 PlexusStreamItem::Error {
1004 metadata,
1005 message: e.to_string(),
1006 code: None,
1007 recoverable: false,
1008 }
1009 }))
1010 }
1011 }
1012 }
1013
1014 #[plexus_macros::hub_method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1019 async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1020 let schema = Activation::plugin_schema(self);
1021 stream! { yield HashEvent::Hash { value: schema.hash }; }
1022 }
1023
1024 }
1026
1027use super::hub_context::HubContext;
1032use std::sync::Weak;
1033
1034#[async_trait]
1040impl HubContext for Weak<DynamicHub> {
1041 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1042 let hub = self.upgrade().ok_or_else(|| {
1043 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1044 })?;
1045 hub.do_resolve_handle(handle).await
1046 }
1047
1048 async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1049 let hub = self.upgrade().ok_or_else(|| {
1050 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1051 })?;
1052 hub.route(method, params).await
1053 }
1054
1055 fn is_valid(&self) -> bool {
1056 self.upgrade().is_some()
1057 }
1058}
1059
1060#[async_trait]
1065impl ChildRouter for DynamicHub {
1066 fn router_namespace(&self) -> &str {
1067 &self.inner.namespace
1068 }
1069
1070 async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
1071 self.route(method, params).await
1074 }
1075
1076 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1077 self.inner.child_routers.get(name)
1079 .map(|router| {
1080 Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1082 })
1083 }
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088 use super::*;
1089
1090 #[test]
1091 fn dynamic_hub_implements_activation() {
1092 fn assert_activation<T: Activation>() {}
1093 assert_activation::<DynamicHub>();
1094 }
1095
1096 #[test]
1097 fn dynamic_hub_methods() {
1098 let hub = DynamicHub::new("test");
1099 let methods = hub.methods();
1100 assert!(methods.contains(&"call"));
1101 assert!(methods.contains(&"hash"));
1102 assert!(methods.contains(&"schema"));
1103 }
1105
1106 #[test]
1107 fn dynamic_hub_hash_stable() {
1108 let h1 = DynamicHub::new("test");
1109 let h2 = DynamicHub::new("test");
1110 assert_eq!(h1.compute_hash(), h2.compute_hash());
1111 }
1112
1113 #[test]
1114 fn dynamic_hub_is_hub() {
1115 use crate::activations::health::Health;
1116 let hub = DynamicHub::new("test").register(Health::new());
1117 let schema = hub.plugin_schema();
1118
1119 assert!(schema.is_hub(), "dynamic hub should be a hub");
1121 assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1122
1123 let children = schema.children.expect("dynamic hub should have children");
1125 assert!(!children.is_empty(), "dynamic hub should have at least one child");
1126
1127 let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1129 assert!(!health.hash.is_empty(), "health should have a hash");
1130 }
1131
1132 #[test]
1133 fn dynamic_hub_schema_structure() {
1134 use crate::activations::health::Health;
1135 let hub = DynamicHub::new("test").register(Health::new());
1136 let schema = hub.plugin_schema();
1137
1138 let json = serde_json::to_string_pretty(&schema).unwrap();
1140 println!("DynamicHub schema:\n{}", json);
1141
1142 assert_eq!(schema.namespace, "test");
1144 assert!(schema.methods.iter().any(|m| m.name == "call"));
1145 assert!(schema.children.is_some());
1146 }
1147
1148 #[tokio::test]
1153 async fn invariant_resolve_handle_unknown_activation() {
1154 use crate::activations::health::Health;
1155 use crate::types::Handle;
1156 use uuid::Uuid;
1157
1158 let hub = DynamicHub::new("test").register(Health::new());
1159
1160 let unknown_plugin_id = Uuid::new_v4();
1162 let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1163
1164 let result = hub.do_resolve_handle(&handle).await;
1165
1166 match result {
1167 Err(PlexusError::ActivationNotFound(_)) => {
1168 }
1170 Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1171 Ok(_) => panic!("Expected error for unknown activation"),
1172 }
1173 }
1174
1175 #[tokio::test]
1176 async fn invariant_resolve_handle_unsupported() {
1177 use crate::activations::health::Health;
1178 use crate::types::Handle;
1179
1180 let hub = DynamicHub::new("test").register(Health::new());
1181
1182 let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1184
1185 let result = hub.do_resolve_handle(&handle).await;
1186
1187 match result {
1188 Err(PlexusError::HandleNotSupported(name)) => {
1189 assert_eq!(name, "health");
1190 }
1191 Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1192 Ok(_) => panic!("Expected error for unsupported handle"),
1193 }
1194 }
1195
1196 #[tokio::test]
1197 async fn invariant_resolve_handle_routes_by_plugin_id() {
1198 use crate::activations::health::Health;
1199 use crate::activations::echo::Echo;
1200 use crate::types::Handle;
1201 use uuid::Uuid;
1202
1203 let health = Health::new();
1204 let echo = Echo::new();
1205 let health_plugin_id = health.plugin_id();
1206 let echo_plugin_id = echo.plugin_id();
1207
1208 let hub = DynamicHub::new("test")
1209 .register(health)
1210 .register(echo);
1211
1212 let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1214 match hub.do_resolve_handle(&health_handle).await {
1215 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1216 Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1217 Ok(_) => panic!("health handle should return HandleNotSupported"),
1218 }
1219
1220 let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1222 match hub.do_resolve_handle(&echo_handle).await {
1223 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1224 Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1225 Ok(_) => panic!("echo handle should return HandleNotSupported"),
1226 }
1227
1228 let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1230 match hub.do_resolve_handle(&unknown_handle).await {
1231 Err(PlexusError::ActivationNotFound(_)) => { },
1232 Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1233 Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1234 }
1235 }
1236
1237 #[test]
1238 fn invariant_handle_plugin_id_determines_routing() {
1239 use crate::activations::health::Health;
1240 use crate::activations::echo::Echo;
1241 use crate::types::Handle;
1242
1243 let health = Health::new();
1244 let echo = Echo::new();
1245
1246 let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1248 .with_meta(vec!["msg-123".into(), "user".into()]);
1249 let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1250 .with_meta(vec!["msg-123".into(), "user".into()]);
1251
1252 assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1254 }
1255
1256 #[test]
1261 fn plugin_registry_basic_operations() {
1262 let mut registry = PluginRegistry::new();
1263 let id = uuid::Uuid::new_v4();
1264
1265 registry.register(id, "test_plugin".to_string(), "test".to_string());
1267
1268 assert_eq!(registry.lookup(id), Some("test_plugin"));
1270
1271 assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1273
1274 let entry = registry.get(id).expect("should have entry");
1276 assert_eq!(entry.path, "test_plugin");
1277 assert_eq!(entry.plugin_type, "test");
1278 }
1279
1280 #[test]
1281 fn plugin_registry_populated_on_register() {
1282 use crate::activations::health::Health;
1283
1284 let hub = DynamicHub::new("test").register(Health::new());
1285
1286 let registry = hub.registry();
1287 assert!(!registry.is_empty(), "registry should not be empty after registration");
1288
1289 let health_id = registry.lookup_by_path("health");
1291 assert!(health_id.is_some(), "health should be registered by path");
1292
1293 let health_uuid = health_id.unwrap();
1295 assert_eq!(registry.lookup(health_uuid), Some("health"));
1296 }
1297
1298 #[test]
1299 fn plugin_registry_deterministic_uuid() {
1300 use crate::activations::health::Health;
1301
1302 let health1 = Health::new();
1304 let health2 = Health::new();
1305
1306 assert_eq!(health1.plugin_id(), health2.plugin_id(),
1307 "same activation type should have deterministic UUID");
1308
1309 let expected = uuid::Uuid::new_v5(
1311 &uuid::Uuid::NAMESPACE_OID,
1312 b"health@1"
1313 );
1314 assert_eq!(health1.plugin_id(), expected,
1315 "plugin_id should be deterministic from namespace@major_version");
1316 }
1317}