1use super::{
9 context::PlexusContext,
10 method_enum::MethodEnumSchema,
11 schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12 streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use futures::Stream;
18use jsonrpsee::core::server::Methods;
19use jsonrpsee::RpcModule;
20use schemars::JsonSchema;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26#[derive(Debug, Clone)]
31pub enum PlexusError {
32 ActivationNotFound(String),
33 MethodNotFound { activation: String, method: String },
34 InvalidParams(String),
35 ExecutionError(String),
36 HandleNotSupported(String),
37 TransportError(TransportErrorKind),
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "error_kind", rename_all = "snake_case")]
42pub enum TransportErrorKind {
43 ConnectionRefused { host: String, port: u16 },
44 ConnectionTimeout { host: String, port: u16 },
45 ProtocolError { message: String },
46 NetworkError { message: String },
47}
48
49impl std::fmt::Display for TransportErrorKind {
50 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51 match self {
52 TransportErrorKind::ConnectionRefused { host, port } => {
53 write!(f, "Connection refused to {}:{}", host, port)
54 }
55 TransportErrorKind::ConnectionTimeout { host, port } => {
56 write!(f, "Connection timeout to {}:{}", host, port)
57 }
58 TransportErrorKind::ProtocolError { message } => {
59 write!(f, "Protocol error: {}", message)
60 }
61 TransportErrorKind::NetworkError { message } => {
62 write!(f, "Network error: {}", message)
63 }
64 }
65 }
66}
67
68impl std::fmt::Display for PlexusError {
69 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70 match self {
71 PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
72 PlexusError::MethodNotFound { activation, method } => {
73 write!(f, "Method not found: {}.{}", activation, method)
74 }
75 PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
76 PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
77 PlexusError::HandleNotSupported(activation) => {
78 write!(f, "Handle resolution not supported by activation: {}", activation)
79 }
80 PlexusError::TransportError(kind) => match kind {
81 TransportErrorKind::ConnectionRefused { host, port } => {
82 write!(f, "Connection refused to {}:{}", host, port)
83 }
84 TransportErrorKind::ConnectionTimeout { host, port } => {
85 write!(f, "Connection timeout to {}:{}", host, port)
86 }
87 TransportErrorKind::ProtocolError { message } => {
88 write!(f, "Protocol error: {}", message)
89 }
90 TransportErrorKind::NetworkError { message } => {
91 write!(f, "Network error: {}", message)
92 }
93 }
94 }
95 }
96}
97
98impl std::error::Error for PlexusError {}
99
100#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
105pub struct ActivationInfo {
106 pub namespace: String,
107 pub version: String,
108 pub description: String,
109 pub methods: Vec<String>,
110}
111
112#[async_trait]
117pub trait Activation: Send + Sync + 'static {
118 type Methods: MethodEnumSchema;
119
120 fn namespace(&self) -> &str;
121 fn version(&self) -> &str;
122 fn description(&self) -> &str { "No description available" }
124 fn long_description(&self) -> Option<&str> { None }
126 fn methods(&self) -> Vec<&str>;
127 fn method_help(&self, _method: &str) -> Option<String> { None }
128 fn plugin_id(&self) -> uuid::Uuid {
132 let major_version = self.version().split('.').next().unwrap_or("0");
133 uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
134 }
135
136 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
137 async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
138 Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
139 }
140
141 fn into_rpc_methods(self) -> Methods where Self: Sized;
142
143 fn plugin_schema(&self) -> PluginSchema {
145 use std::collections::hash_map::DefaultHasher;
146 use std::hash::{Hash, Hasher};
147
148 let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
149 let desc = self.method_help(name).unwrap_or_default();
150 let mut hasher = DefaultHasher::new();
152 name.hash(&mut hasher);
153 desc.hash(&mut hasher);
154 let hash = format!("{:016x}", hasher.finish());
155 MethodSchema::new(name.to_string(), desc, hash)
156 }).collect();
157
158 if let Some(long_desc) = self.long_description() {
159 PluginSchema::leaf_with_long_description(
160 self.namespace(),
161 self.version(),
162 self.description(),
163 long_desc,
164 methods,
165 )
166 } else {
167 PluginSchema::leaf(
168 self.namespace(),
169 self.version(),
170 self.description(),
171 methods,
172 )
173 }
174 }
175}
176
177#[async_trait]
190pub trait ChildRouter: Send + Sync {
191 fn router_namespace(&self) -> &str;
193
194 async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
196
197 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
199}
200
201pub async fn route_to_child<T: ChildRouter + ?Sized>(
207 parent: &T,
208 method: &str,
209 params: Value,
210) -> Result<PlexusStream, PlexusError> {
211 if let Some((child_name, rest)) = method.split_once('.') {
213 if let Some(child) = parent.get_child(child_name).await {
214 return child.router_call(rest, params).await;
215 }
216 return Err(PlexusError::ActivationNotFound(child_name.to_string()));
217 }
218
219 Err(PlexusError::MethodNotFound {
221 activation: parent.router_namespace().to_string(),
222 method: method.to_string(),
223 })
224}
225
226struct ArcChildRouter(Arc<dyn ChildRouter>);
230
231#[async_trait]
232impl ChildRouter for ArcChildRouter {
233 fn router_namespace(&self) -> &str {
234 self.0.router_namespace()
235 }
236
237 async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
238 self.0.router_call(method, params).await
239 }
240
241 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
242 self.0.get_child(name).await
243 }
244}
245
246#[async_trait]
251#[allow(dead_code)] trait ActivationObject: Send + Sync + 'static {
253 fn namespace(&self) -> &str;
254 fn version(&self) -> &str;
255 fn description(&self) -> &str;
256 fn long_description(&self) -> Option<&str>;
257 fn methods(&self) -> Vec<&str>;
258 fn method_help(&self, method: &str) -> Option<String>;
259 fn plugin_id(&self) -> uuid::Uuid;
260 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
261 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
262 fn plugin_schema(&self) -> PluginSchema;
263 fn schema(&self) -> Schema;
264}
265
266struct ActivationWrapper<A: Activation> {
267 inner: A,
268}
269
270#[async_trait]
271impl<A: Activation> ActivationObject for ActivationWrapper<A> {
272 fn namespace(&self) -> &str { self.inner.namespace() }
273 fn version(&self) -> &str { self.inner.version() }
274 fn description(&self) -> &str { self.inner.description() }
275 fn long_description(&self) -> Option<&str> { self.inner.long_description() }
276 fn methods(&self) -> Vec<&str> { self.inner.methods() }
277 fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
278 fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
279
280 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
281 self.inner.call(method, params).await
282 }
283
284 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
285 self.inner.resolve_handle(handle).await
286 }
287
288 fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
289
290 fn schema(&self) -> Schema {
291 let schema = schemars::schema_for!(A::Methods);
292 serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
293 .expect("parse schema")
294 }
295}
296
297#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
302#[serde(tag = "event", rename_all = "snake_case")]
303pub enum HashEvent {
304 Hash { value: String },
305}
306
307#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
309#[serde(tag = "event", rename_all = "snake_case")]
310pub enum SchemaEvent {
311 Schema(PluginSchema),
313}
314
315#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
317pub struct PluginHashes {
318 pub namespace: String,
319 pub self_hash: String,
320 #[serde(skip_serializing_if = "Option::is_none")]
321 pub children_hash: Option<String>,
322 pub hash: String,
323 #[serde(skip_serializing_if = "Option::is_none")]
325 pub children: Option<Vec<ChildHashes>>,
326}
327
328#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
330pub struct ChildHashes {
331 pub namespace: String,
332 pub hash: String,
333}
334
335
336#[derive(Debug, Clone)]
342pub struct PluginEntry {
343 pub id: uuid::Uuid,
345 pub path: String,
347 pub plugin_type: String,
349}
350
351#[derive(Default)]
356pub struct PluginRegistry {
357 by_id: HashMap<uuid::Uuid, PluginEntry>,
359 by_path: HashMap<String, uuid::Uuid>,
361}
362
363#[derive(Clone)]
367pub struct PluginRegistrySnapshot {
368 by_id: HashMap<uuid::Uuid, PluginEntry>,
369 by_path: HashMap<String, uuid::Uuid>,
370}
371
372impl PluginRegistrySnapshot {
373 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
375 self.by_id.get(&id).map(|e| e.path.as_str())
376 }
377
378 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
380 self.by_path.get(path).copied()
381 }
382
383 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
385 self.by_id.get(&id)
386 }
387
388 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
390 self.by_id.values()
391 }
392
393 pub fn len(&self) -> usize {
395 self.by_id.len()
396 }
397
398 pub fn is_empty(&self) -> bool {
400 self.by_id.is_empty()
401 }
402}
403
404impl PluginRegistry {
405 pub fn new() -> Self {
407 Self::default()
408 }
409
410 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
412 self.by_id.get(&id).map(|e| e.path.as_str())
413 }
414
415 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
417 self.by_path.get(path).copied()
418 }
419
420 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
422 self.by_id.get(&id)
423 }
424
425 pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
427 let entry = PluginEntry { id, path: path.clone(), plugin_type };
428 self.by_id.insert(id, entry);
429 self.by_path.insert(path, id);
430 }
431
432 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
434 self.by_id.values()
435 }
436
437 pub fn len(&self) -> usize {
439 self.by_id.len()
440 }
441
442 pub fn is_empty(&self) -> bool {
444 self.by_id.is_empty()
445 }
446}
447
448struct DynamicHubInner {
453 namespace: String,
455 activations: HashMap<String, Arc<dyn ActivationObject>>,
456 child_routers: HashMap<String, Arc<dyn ChildRouter>>,
458 registry: std::sync::RwLock<PluginRegistry>,
460 pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
461}
462
463#[derive(Clone)]
485pub struct DynamicHub {
486 inner: Arc<DynamicHubInner>,
487}
488
489impl DynamicHub {
494 pub fn new(namespace: impl Into<String>) -> Self {
504 Self {
505 inner: Arc::new(DynamicHubInner {
506 namespace: namespace.into(),
507 activations: HashMap::new(),
508 child_routers: HashMap::new(),
509 registry: std::sync::RwLock::new(PluginRegistry::new()),
510 pending_rpc: std::sync::Mutex::new(Vec::new()),
511 }),
512 }
513 }
514
515 #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
517 pub fn with_namespace(namespace: impl Into<String>) -> Self {
518 Self::new(namespace)
519 }
520
521 pub fn runtime_namespace(&self) -> &str {
523 &self.inner.namespace
524 }
525
526 pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
528 self.inner.registry.read().unwrap()
529 }
530
531 pub fn register<A: Activation + Clone>(mut self, activation: A) -> Self {
533 let namespace = activation.namespace().to_string();
534 let plugin_id = activation.plugin_id();
535 let activation_for_rpc = activation.clone();
536
537 let inner = Arc::get_mut(&mut self.inner)
538 .expect("Cannot register: DynamicHub has multiple references");
539
540 inner.registry.write().unwrap().register(
542 plugin_id,
543 namespace.clone(),
544 namespace.clone(), );
546
547 inner.activations.insert(namespace, Arc::new(ActivationWrapper { inner: activation }));
548 inner.pending_rpc.lock().unwrap()
549 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
550 self
551 }
552
553 pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
558 let namespace = activation.namespace().to_string();
559 let plugin_id = activation.plugin_id();
560 let activation_for_rpc = activation.clone();
561 let activation_for_router = activation.clone();
562
563 let inner = Arc::get_mut(&mut self.inner)
564 .expect("Cannot register: DynamicHub has multiple references");
565
566 inner.registry.write().unwrap().register(
568 plugin_id,
569 namespace.clone(),
570 namespace.clone(), );
572
573 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
574 inner.child_routers.insert(namespace, Arc::new(activation_for_router));
575 inner.pending_rpc.lock().unwrap()
576 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
577 self
578 }
579
580 pub fn list_methods(&self) -> Vec<String> {
582 let mut methods = Vec::new();
583
584 for m in Activation::methods(self) {
586 methods.push(format!("{}.{}", self.inner.namespace, m));
587 }
588
589 for (ns, act) in &self.inner.activations {
591 for m in act.methods() {
592 methods.push(format!("{}.{}", ns, m));
593 }
594 }
595 methods.sort();
596 methods
597 }
598
599 pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
601 let mut activations = Vec::new();
602
603 activations.push(ActivationInfo {
605 namespace: Activation::namespace(self).to_string(),
606 version: Activation::version(self).to_string(),
607 description: Activation::description(self).to_string(),
608 methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
609 });
610
611 for a in self.inner.activations.values() {
613 activations.push(ActivationInfo {
614 namespace: a.namespace().to_string(),
615 version: a.version().to_string(),
616 description: a.description().to_string(),
617 methods: a.methods().iter().map(|s| s.to_string()).collect(),
618 });
619 }
620
621 activations
622 }
623
624 pub fn compute_hash(&self) -> String {
629 Activation::plugin_schema(self).hash
630 }
631
632 pub async fn route(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
634 let (namespace, method_name) = self.parse_method(method)?;
635
636 if namespace == self.inner.namespace {
638 return Activation::call(self, method_name, params).await;
639 }
640
641 let activation = self.inner.activations.get(namespace)
642 .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
643
644 activation.call(method_name, params).await
645 }
646
647 pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
651 let path = self.inner.registry.read().unwrap()
652 .lookup(handle.plugin_id)
653 .map(|s| s.to_string())
654 .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
655
656 let activation = self.inner.activations.get(&path)
657 .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
658 activation.resolve_handle(handle).await
659 }
660
661 pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
663 self.inner.activations.get(namespace).map(|a| a.schema())
664 }
665
666 pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
668 let guard = self.inner.registry.read().unwrap();
669 PluginRegistrySnapshot {
670 by_id: guard.by_id.clone(),
671 by_path: guard.by_path.clone(),
672 }
673 }
674
675 pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
677 self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
678 }
679
680 pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
682 self.inner.registry.read().unwrap().lookup_by_path(path)
683 }
684
685 pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
687 let mut schemas = Vec::new();
688
689 schemas.push(Activation::plugin_schema(self));
691
692 for a in self.inner.activations.values() {
694 schemas.push(a.plugin_schema());
695 }
696
697 schemas
698 }
699
700 #[deprecated(note = "Use list_plugin_schemas instead")]
702 pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
703 self.list_plugin_schemas()
704 }
705
706 pub fn get_method_help(&self, method: &str) -> Option<String> {
708 let (namespace, method_name) = self.parse_method(method).ok()?;
709 let activation = self.inner.activations.get(namespace)?;
710 activation.method_help(method_name)
711 }
712
713 fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
714 let parts: Vec<&str> = method.splitn(2, '.').collect();
715 if parts.len() != 2 {
716 return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
717 }
718 Ok((parts[0], parts[1]))
719 }
720
721 pub fn plugin_children(&self) -> Vec<ChildSummary> {
724 self.inner.activations.values()
725 .map(|a| {
726 let schema = a.plugin_schema();
727 ChildSummary {
728 namespace: schema.namespace,
729 description: schema.description,
730 hash: schema.hash,
731 }
732 })
733 .collect()
734 }
735
736 pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
738 let mut module = RpcModule::new(());
739
740 PlexusContext::init(self.compute_hash());
741
742 let ns = self.runtime_namespace();
745 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
746 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
747 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
748 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
749 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
750 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
751 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
752 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
753 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
754
755 let plexus_for_call = self.clone();
757 module.register_subscription(
758 call_method,
759 call_method,
760 call_unsub,
761 move |params, pending, _ctx, _ext| {
762 let plexus = plexus_for_call.clone();
763 Box::pin(async move {
764 let p: CallParams = params.parse()?;
766 let stream = plexus.route(&p.method, p.params.unwrap_or_default()).await
767 .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
768 pipe_stream_to_subscription(pending, stream).await
769 })
770 }
771 )?;
772
773 let plexus_for_hash = self.clone();
775 module.register_subscription(
776 hash_method,
777 hash_method,
778 hash_unsub,
779 move |_params, pending, _ctx, _ext| {
780 let plexus = plexus_for_hash.clone();
781 Box::pin(async move {
782 let schema = Activation::plugin_schema(&plexus);
783 let stream = async_stream::stream! {
784 yield HashEvent::Hash { value: schema.hash };
785 };
786 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
787 pipe_stream_to_subscription(pending, wrapped).await
788 })
789 }
790 )?;
791
792 let plexus_for_schema = self.clone();
794 module.register_subscription(
795 schema_method,
796 schema_method,
797 schema_unsub,
798 move |params, pending, _ctx, _ext| {
799 let plexus = plexus_for_schema.clone();
800 Box::pin(async move {
801 let p: SchemaParams = params.parse().unwrap_or_default();
802 let plugin_schema = Activation::plugin_schema(&plexus);
803
804 let result = if let Some(ref name) = p.method {
805 plugin_schema.methods.iter()
806 .find(|m| m.name == *name)
807 .map(|m| super::SchemaResult::Method(m.clone()))
808 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
809 -32602,
810 format!("Method '{}' not found", name),
811 None::<()>,
812 ))?
813 } else {
814 super::SchemaResult::Plugin(plugin_schema)
815 };
816
817 let stream = async_stream::stream! { yield result; };
818 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
819 pipe_stream_to_subscription(pending, wrapped).await
820 })
821 }
822 )?;
823
824 let backend_name = self.runtime_namespace().to_string();
827 module.register_subscription(
828 "_info",
829 "_info",
830 "_info_unsub",
831 move |_params, pending, _ctx, _ext| {
832 let name = backend_name.clone();
833 Box::pin(async move {
834 let info_stream = futures::stream::once(async move {
836 serde_json::json!({"backend": name})
837 });
838
839 let wrapped = super::streaming::wrap_stream(
841 info_stream,
842 "_info",
843 vec![]
844 );
845
846 pipe_stream_to_subscription(pending, wrapped).await
848 })
849 }
850 )?;
851
852 let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
854 for factory in pending {
855 module.merge(factory())?;
856 }
857
858 Ok(module)
859 }
860
861 pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
867 let mut module = RpcModule::new(());
868
869 PlexusContext::init(hub.compute_hash());
870
871 let ns = hub.runtime_namespace();
874 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
875 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
876 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
877 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
878 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
879 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
880 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
881 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
882 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
883
884 let hub_for_call = hub.clone();
886 module.register_subscription(
887 call_method,
888 call_method,
889 call_unsub,
890 move |params, pending, _ctx, _ext| {
891 let hub = hub_for_call.clone();
892 Box::pin(async move {
893 let p: CallParams = params.parse()?;
894 let stream = hub.route(&p.method, p.params.unwrap_or_default()).await
895 .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
896 pipe_stream_to_subscription(pending, stream).await
897 })
898 }
899 )?;
900
901 let hub_for_hash = hub.clone();
903 module.register_subscription(
904 hash_method,
905 hash_method,
906 hash_unsub,
907 move |_params, pending, _ctx, _ext| {
908 let hub = hub_for_hash.clone();
909 Box::pin(async move {
910 let schema = Activation::plugin_schema(&*hub);
911 let stream = async_stream::stream! {
912 yield HashEvent::Hash { value: schema.hash };
913 };
914 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
915 pipe_stream_to_subscription(pending, wrapped).await
916 })
917 }
918 )?;
919
920 let hub_for_schema = hub.clone();
922 module.register_subscription(
923 schema_method,
924 schema_method,
925 schema_unsub,
926 move |params, pending, _ctx, _ext| {
927 let hub = hub_for_schema.clone();
928 Box::pin(async move {
929 let p: SchemaParams = params.parse().unwrap_or_default();
930 let plugin_schema = Activation::plugin_schema(&*hub);
931
932 let result = if let Some(ref name) = p.method {
933 plugin_schema.methods.iter()
934 .find(|m| m.name == *name)
935 .map(|m| super::SchemaResult::Method(m.clone()))
936 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
937 -32602,
938 format!("Method '{}' not found", name),
939 None::<()>,
940 ))?
941 } else {
942 super::SchemaResult::Plugin(plugin_schema)
943 };
944
945 let stream = async_stream::stream! {
946 yield result;
947 };
948 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
949 pipe_stream_to_subscription(pending, wrapped).await
950 })
951 }
952 )?;
953
954 let backend_name = hub.runtime_namespace().to_string();
957 module.register_subscription(
958 "_info",
959 "_info",
960 "_info_unsub",
961 move |_params, pending, _ctx, _ext| {
962 let name = backend_name.clone();
963 Box::pin(async move {
964 let info_stream = futures::stream::once(async move {
966 serde_json::json!({"backend": name})
967 });
968
969 let wrapped = super::streaming::wrap_stream(
971 info_stream,
972 "_info",
973 vec![]
974 );
975
976 pipe_stream_to_subscription(pending, wrapped).await
978 })
979 }
980 )?;
981
982 let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
985 module.register_async_method(respond_method, |params, _ctx, _ext| async move {
986 use super::bidirectional::{handle_pending_response, BidirError};
987
988 let p: RespondParams = params.parse()?;
989
990 tracing::debug!(
991 request_id = %p.request_id,
992 "Handling {}.respond via WebSocket",
993 "plexus"
994 );
995
996 match handle_pending_response(&p.request_id, p.response_data) {
997 Ok(()) => Ok(serde_json::json!({"success": true})),
998 Err(BidirError::UnknownRequest) => {
999 tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1000 Err(jsonrpsee::types::ErrorObject::owned(
1001 -32602,
1002 format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1003 None::<()>,
1004 ))
1005 }
1006 Err(BidirError::ChannelClosed) => {
1007 tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1008 Err(jsonrpsee::types::ErrorObject::owned(
1009 -32000,
1010 "Response channel was closed (request may have timed out)",
1011 None::<()>,
1012 ))
1013 }
1014 Err(e) => {
1015 tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1016 Err(jsonrpsee::types::ErrorObject::owned(
1017 -32000,
1018 format!("Failed to deliver response: {}", e),
1019 None::<()>,
1020 ))
1021 }
1022 }
1023 })?;
1024
1025 let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1027 for factory in pending {
1028 module.merge(factory())?;
1029 }
1030
1031 Ok(module)
1032 }
1033}
1034
1035#[derive(Debug, serde::Deserialize)]
1037struct CallParams {
1038 method: String,
1039 #[serde(default)]
1040 params: Option<Value>,
1041}
1042
1043#[derive(Debug, Default, serde::Deserialize)]
1045struct SchemaParams {
1046 method: Option<String>,
1047}
1048
1049#[derive(Debug, serde::Deserialize)]
1051struct RespondParams {
1052 request_id: String,
1053 response_data: Value,
1054}
1055
1056async fn pipe_stream_to_subscription(
1058 pending: jsonrpsee::PendingSubscriptionSink,
1059 mut stream: PlexusStream,
1060) -> jsonrpsee::core::SubscriptionResult {
1061 use futures::StreamExt;
1062 use jsonrpsee::SubscriptionMessage;
1063
1064 let sink = pending.accept().await?;
1065 while let Some(item) = stream.next().await {
1066 let msg = SubscriptionMessage::new("result", sink.subscription_id(), &item)?;
1067 sink.send(msg).await?;
1068 }
1069 Ok(())
1070}
1071
1072#[plexus_macros::hub_methods(
1077 namespace = "plexus",
1078 version = "1.0.0",
1079 description = "Central routing and introspection",
1080 hub,
1081 namespace_fn = "runtime_namespace"
1082)]
1083impl DynamicHub {
1084 #[plexus_macros::hub_method(
1086 streaming,
1087 description = "Route a call to a registered activation",
1088 params(
1089 method = "The method to call (format: namespace.method)",
1090 params = "Parameters to pass to the method (optional, defaults to {})"
1091 )
1092 )]
1093 async fn call(
1094 &self,
1095 method: String,
1096 params: Option<Value>,
1097 ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1098 use super::context::PlexusContext;
1099 use super::types::{PlexusStreamItem, StreamMetadata};
1100
1101 let result = self.route(&method, params.unwrap_or_default()).await;
1102
1103 match result {
1104 Ok(plexus_stream) => {
1105 plexus_stream
1107 }
1108 Err(e) => {
1109 let metadata = StreamMetadata::new(
1111 vec![self.inner.namespace.clone()],
1112 PlexusContext::hash(),
1113 );
1114 Box::pin(futures::stream::once(async move {
1115 PlexusStreamItem::Error {
1116 metadata,
1117 message: e.to_string(),
1118 code: None,
1119 recoverable: false,
1120 }
1121 }))
1122 }
1123 }
1124 }
1125
1126 #[plexus_macros::hub_method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1131 async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1132 let schema = Activation::plugin_schema(self);
1133 stream! { yield HashEvent::Hash { value: schema.hash }; }
1134 }
1135
1136 #[plexus_macros::hub_method(description = "Get plugin hashes for cache validation")]
1138 async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1139 let schema = Activation::plugin_schema(self);
1140
1141 stream! {
1142 yield PluginHashes {
1143 namespace: schema.namespace.clone(),
1144 self_hash: schema.self_hash.clone(),
1145 children_hash: schema.children_hash.clone(),
1146 hash: schema.hash.clone(),
1147 children: schema.children.as_ref().map(|kids| {
1148 kids.iter()
1149 .map(|c| ChildHashes {
1150 namespace: c.namespace.clone(),
1151 hash: c.hash.clone(),
1152 })
1153 .collect()
1154 }),
1155 };
1156 }
1157 }
1158
1159 }
1161
1162use super::hub_context::HubContext;
1167use std::sync::Weak;
1168
1169#[async_trait]
1175impl HubContext for Weak<DynamicHub> {
1176 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1177 let hub = self.upgrade().ok_or_else(|| {
1178 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1179 })?;
1180 hub.do_resolve_handle(handle).await
1181 }
1182
1183 async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1184 let hub = self.upgrade().ok_or_else(|| {
1185 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1186 })?;
1187 hub.route(method, params).await
1188 }
1189
1190 fn is_valid(&self) -> bool {
1191 self.upgrade().is_some()
1192 }
1193}
1194
1195#[async_trait]
1200impl ChildRouter for DynamicHub {
1201 fn router_namespace(&self) -> &str {
1202 &self.inner.namespace
1203 }
1204
1205 async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
1206 self.route(method, params).await
1209 }
1210
1211 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1212 self.inner.child_routers.get(name)
1214 .map(|router| {
1215 Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1217 })
1218 }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223 use super::*;
1224
1225 #[test]
1226 fn dynamic_hub_implements_activation() {
1227 fn assert_activation<T: Activation>() {}
1228 assert_activation::<DynamicHub>();
1229 }
1230
1231 #[test]
1232 fn dynamic_hub_methods() {
1233 let hub = DynamicHub::new("test");
1234 let methods = hub.methods();
1235 assert!(methods.contains(&"call"));
1236 assert!(methods.contains(&"hash"));
1237 assert!(methods.contains(&"schema"));
1238 }
1240
1241 #[test]
1242 fn dynamic_hub_hash_stable() {
1243 let h1 = DynamicHub::new("test");
1244 let h2 = DynamicHub::new("test");
1245 assert_eq!(h1.compute_hash(), h2.compute_hash());
1246 }
1247
1248 #[test]
1249 fn dynamic_hub_is_hub() {
1250 use crate::activations::health::Health;
1251 let hub = DynamicHub::new("test").register(Health::new());
1252 let schema = hub.plugin_schema();
1253
1254 assert!(schema.is_hub(), "dynamic hub should be a hub");
1256 assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1257
1258 let children = schema.children.expect("dynamic hub should have children");
1260 assert!(!children.is_empty(), "dynamic hub should have at least one child");
1261
1262 let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1264 assert!(!health.hash.is_empty(), "health should have a hash");
1265 }
1266
1267 #[test]
1268 fn dynamic_hub_schema_structure() {
1269 use crate::activations::health::Health;
1270 let hub = DynamicHub::new("test").register(Health::new());
1271 let schema = hub.plugin_schema();
1272
1273 let json = serde_json::to_string_pretty(&schema).unwrap();
1275 println!("DynamicHub schema:\n{}", json);
1276
1277 assert_eq!(schema.namespace, "test");
1279 assert!(schema.methods.iter().any(|m| m.name == "call"));
1280 assert!(schema.children.is_some());
1281 }
1282
1283 #[tokio::test]
1288 async fn invariant_resolve_handle_unknown_activation() {
1289 use crate::activations::health::Health;
1290 use crate::types::Handle;
1291 use uuid::Uuid;
1292
1293 let hub = DynamicHub::new("test").register(Health::new());
1294
1295 let unknown_plugin_id = Uuid::new_v4();
1297 let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1298
1299 let result = hub.do_resolve_handle(&handle).await;
1300
1301 match result {
1302 Err(PlexusError::ActivationNotFound(_)) => {
1303 }
1305 Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1306 Ok(_) => panic!("Expected error for unknown activation"),
1307 }
1308 }
1309
1310 #[tokio::test]
1311 async fn invariant_resolve_handle_unsupported() {
1312 use crate::activations::health::Health;
1313 use crate::types::Handle;
1314
1315 let hub = DynamicHub::new("test").register(Health::new());
1316
1317 let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1319
1320 let result = hub.do_resolve_handle(&handle).await;
1321
1322 match result {
1323 Err(PlexusError::HandleNotSupported(name)) => {
1324 assert_eq!(name, "health");
1325 }
1326 Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1327 Ok(_) => panic!("Expected error for unsupported handle"),
1328 }
1329 }
1330
1331 #[tokio::test]
1332 async fn invariant_resolve_handle_routes_by_plugin_id() {
1333 use crate::activations::health::Health;
1334 use crate::activations::echo::Echo;
1335 use crate::types::Handle;
1336 use uuid::Uuid;
1337
1338 let health = Health::new();
1339 let echo = Echo::new();
1340 let health_plugin_id = health.plugin_id();
1341 let echo_plugin_id = echo.plugin_id();
1342
1343 let hub = DynamicHub::new("test")
1344 .register(health)
1345 .register(echo);
1346
1347 let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1349 match hub.do_resolve_handle(&health_handle).await {
1350 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1351 Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1352 Ok(_) => panic!("health handle should return HandleNotSupported"),
1353 }
1354
1355 let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1357 match hub.do_resolve_handle(&echo_handle).await {
1358 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1359 Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1360 Ok(_) => panic!("echo handle should return HandleNotSupported"),
1361 }
1362
1363 let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1365 match hub.do_resolve_handle(&unknown_handle).await {
1366 Err(PlexusError::ActivationNotFound(_)) => { },
1367 Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1368 Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1369 }
1370 }
1371
1372 #[test]
1373 fn invariant_handle_plugin_id_determines_routing() {
1374 use crate::activations::health::Health;
1375 use crate::activations::echo::Echo;
1376 use crate::types::Handle;
1377
1378 let health = Health::new();
1379 let echo = Echo::new();
1380
1381 let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1383 .with_meta(vec!["msg-123".into(), "user".into()]);
1384 let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1385 .with_meta(vec!["msg-123".into(), "user".into()]);
1386
1387 assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1389 }
1390
1391 #[test]
1396 fn plugin_registry_basic_operations() {
1397 let mut registry = PluginRegistry::new();
1398 let id = uuid::Uuid::new_v4();
1399
1400 registry.register(id, "test_plugin".to_string(), "test".to_string());
1402
1403 assert_eq!(registry.lookup(id), Some("test_plugin"));
1405
1406 assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1408
1409 let entry = registry.get(id).expect("should have entry");
1411 assert_eq!(entry.path, "test_plugin");
1412 assert_eq!(entry.plugin_type, "test");
1413 }
1414
1415 #[test]
1416 fn plugin_registry_populated_on_register() {
1417 use crate::activations::health::Health;
1418
1419 let hub = DynamicHub::new("test").register(Health::new());
1420
1421 let registry = hub.registry();
1422 assert!(!registry.is_empty(), "registry should not be empty after registration");
1423
1424 let health_id = registry.lookup_by_path("health");
1426 assert!(health_id.is_some(), "health should be registered by path");
1427
1428 let health_uuid = health_id.unwrap();
1430 assert_eq!(registry.lookup(health_uuid), Some("health"));
1431 }
1432
1433 #[test]
1434 fn plugin_registry_deterministic_uuid() {
1435 use crate::activations::health::Health;
1436
1437 let health1 = Health::new();
1439 let health2 = Health::new();
1440
1441 assert_eq!(health1.plugin_id(), health2.plugin_id(),
1442 "same activation type should have deterministic UUID");
1443
1444 let expected = uuid::Uuid::new_v5(
1446 &uuid::Uuid::NAMESPACE_OID,
1447 b"health@1"
1448 );
1449 assert_eq!(health1.plugin_id(), expected,
1450 "plugin_id should be deterministic from namespace@major_version");
1451 }
1452}