1use super::{
9 context::PlexusContext,
10 method_enum::MethodEnumSchema,
11 schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12 streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use futures::Stream;
18use jsonrpsee::core::server::Methods;
19use jsonrpsee::RpcModule;
20
21pub const PLEXUS_NOTIF_METHOD: &str = "result";
27use schemars::JsonSchema;
28use serde::{Deserialize, Serialize};
29use serde_json::Value;
30use std::collections::HashMap;
31use std::sync::Arc;
32
33#[derive(Debug, Clone)]
38pub enum PlexusError {
39 ActivationNotFound(String),
40 MethodNotFound { activation: String, method: String },
41 InvalidParams(String),
42 ExecutionError(String),
43 HandleNotSupported(String),
44 TransportError(TransportErrorKind),
45 Unauthenticated(String),
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "error_kind", rename_all = "snake_case")]
50pub enum TransportErrorKind {
51 ConnectionRefused { host: String, port: u16 },
52 ConnectionTimeout { host: String, port: u16 },
53 ProtocolError { message: String },
54 NetworkError { message: String },
55}
56
57impl std::fmt::Display for TransportErrorKind {
58 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59 match self {
60 TransportErrorKind::ConnectionRefused { host, port } => {
61 write!(f, "Connection refused to {}:{}", host, port)
62 }
63 TransportErrorKind::ConnectionTimeout { host, port } => {
64 write!(f, "Connection timeout to {}:{}", host, port)
65 }
66 TransportErrorKind::ProtocolError { message } => {
67 write!(f, "Protocol error: {}", message)
68 }
69 TransportErrorKind::NetworkError { message } => {
70 write!(f, "Network error: {}", message)
71 }
72 }
73 }
74}
75
76impl std::fmt::Display for PlexusError {
77 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78 match self {
79 PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
80 PlexusError::MethodNotFound { activation, method } => {
81 write!(f, "Method not found: {}.{}", activation, method)
82 }
83 PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
84 PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
85 PlexusError::HandleNotSupported(activation) => {
86 write!(f, "Handle resolution not supported by activation: {}", activation)
87 }
88 PlexusError::TransportError(kind) => match kind {
89 TransportErrorKind::ConnectionRefused { host, port } => {
90 write!(f, "Connection refused to {}:{}", host, port)
91 }
92 TransportErrorKind::ConnectionTimeout { host, port } => {
93 write!(f, "Connection timeout to {}:{}", host, port)
94 }
95 TransportErrorKind::ProtocolError { message } => {
96 write!(f, "Protocol error: {}", message)
97 }
98 TransportErrorKind::NetworkError { message } => {
99 write!(f, "Network error: {}", message)
100 }
101 }
102 PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
103 }
104 }
105}
106
107impl std::error::Error for PlexusError {}
108
109fn plexus_error_code(e: &PlexusError) -> i32 {
118 match e {
119 PlexusError::Unauthenticated(_) => -32001,
120 PlexusError::InvalidParams(_) => -32602,
121 PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
122 _ => -32000,
123 }
124}
125
126fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
128 jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
129}
130
131#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
136pub struct ActivationInfo {
137 pub namespace: String,
138 pub version: String,
139 pub description: String,
140 pub methods: Vec<String>,
141}
142
143#[async_trait]
148pub trait Activation: Send + Sync + 'static {
149 type Methods: MethodEnumSchema;
150
151 fn namespace(&self) -> &str;
152 fn version(&self) -> &str;
153 fn description(&self) -> &str { "No description available" }
155 fn long_description(&self) -> Option<&str> { None }
157 fn methods(&self) -> Vec<&str>;
158 fn method_help(&self, _method: &str) -> Option<String> { None }
159 fn plugin_id(&self) -> uuid::Uuid {
163 let major_version = self.version().split('.').next().unwrap_or("0");
164 uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
165 }
166
167 async fn call(
168 &self,
169 method: &str,
170 params: Value,
171 auth: Option<&super::auth::AuthContext>,
172 raw_ctx: Option<&crate::request::RawRequestContext>,
173 ) -> Result<PlexusStream, PlexusError>;
174 async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
175 Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
176 }
177
178 fn into_rpc_methods(self) -> Methods where Self: Sized;
179
180 fn plugin_schema(&self) -> PluginSchema {
182 use std::collections::hash_map::DefaultHasher;
183 use std::hash::{Hash, Hasher};
184
185 let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
186 let desc = self.method_help(name).unwrap_or_default();
187 let mut hasher = DefaultHasher::new();
189 name.hash(&mut hasher);
190 desc.hash(&mut hasher);
191 let hash = format!("{:016x}", hasher.finish());
192 MethodSchema::new(name.to_string(), desc, hash)
193 }).collect();
194
195 if let Some(long_desc) = self.long_description() {
196 PluginSchema::leaf_with_long_description(
197 self.namespace(),
198 self.version(),
199 self.description(),
200 long_desc,
201 methods,
202 )
203 } else {
204 PluginSchema::leaf(
205 self.namespace(),
206 self.version(),
207 self.description(),
208 methods,
209 )
210 }
211 }
212}
213
214#[async_trait]
227pub trait ChildRouter: Send + Sync {
228 fn router_namespace(&self) -> &str;
230
231 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
233
234 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
236}
237
238pub async fn route_to_child<T: ChildRouter + ?Sized>(
244 parent: &T,
245 method: &str,
246 params: Value,
247 auth: Option<&super::auth::AuthContext>,
248 raw_ctx: Option<&crate::request::RawRequestContext>,
249) -> Result<PlexusStream, PlexusError> {
250 if let Some((child_name, rest)) = method.split_once('.') {
252 if let Some(child) = parent.get_child(child_name).await {
253 return child.router_call(rest, params, auth, raw_ctx).await;
254 }
255 return Err(PlexusError::ActivationNotFound(child_name.to_string()));
256 }
257
258 Err(PlexusError::MethodNotFound {
260 activation: parent.router_namespace().to_string(),
261 method: method.to_string(),
262 })
263}
264
265struct ArcChildRouter(Arc<dyn ChildRouter>);
269
270#[async_trait]
271impl ChildRouter for ArcChildRouter {
272 fn router_namespace(&self) -> &str {
273 self.0.router_namespace()
274 }
275
276 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
277 self.0.router_call(method, params, auth, raw_ctx).await
278 }
279
280 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
281 self.0.get_child(name).await
282 }
283}
284
285#[async_trait]
290#[allow(dead_code)] trait ActivationObject: Send + Sync + 'static {
292 fn namespace(&self) -> &str;
293 fn version(&self) -> &str;
294 fn description(&self) -> &str;
295 fn long_description(&self) -> Option<&str>;
296 fn methods(&self) -> Vec<&str>;
297 fn method_help(&self, method: &str) -> Option<String>;
298 fn plugin_id(&self) -> uuid::Uuid;
299 async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
300 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
301 fn plugin_schema(&self) -> PluginSchema;
302 fn schema(&self) -> Schema;
303}
304
305struct ActivationWrapper<A: Activation> {
306 inner: A,
307}
308
309#[async_trait]
310impl<A: Activation> ActivationObject for ActivationWrapper<A> {
311 fn namespace(&self) -> &str { self.inner.namespace() }
312 fn version(&self) -> &str { self.inner.version() }
313 fn description(&self) -> &str { self.inner.description() }
314 fn long_description(&self) -> Option<&str> { self.inner.long_description() }
315 fn methods(&self) -> Vec<&str> { self.inner.methods() }
316 fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
317 fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
318
319 async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
320 self.inner.call(method, params, auth, raw_ctx).await
321 }
322
323 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
324 self.inner.resolve_handle(handle).await
325 }
326
327 fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
328
329 fn schema(&self) -> Schema {
330 let schema = schemars::schema_for!(A::Methods);
331 serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
332 .expect("parse schema")
333 }
334}
335
336#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
341#[serde(tag = "event", rename_all = "snake_case")]
342pub enum HashEvent {
343 Hash { value: String },
344}
345
346#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
348#[serde(tag = "event", rename_all = "snake_case")]
349pub enum SchemaEvent {
350 Schema(PluginSchema),
352}
353
354#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
356pub struct PluginHashes {
357 pub namespace: String,
358 pub self_hash: String,
359 #[serde(skip_serializing_if = "Option::is_none")]
360 pub children_hash: Option<String>,
361 pub hash: String,
362 #[serde(skip_serializing_if = "Option::is_none")]
364 pub children: Option<Vec<ChildHashes>>,
365}
366
367#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
369pub struct ChildHashes {
370 pub namespace: String,
371 pub hash: String,
372}
373
374
375#[derive(Debug, Clone)]
381pub struct PluginEntry {
382 pub id: uuid::Uuid,
384 pub path: String,
386 pub plugin_type: String,
388}
389
390#[derive(Default)]
395pub struct PluginRegistry {
396 by_id: HashMap<uuid::Uuid, PluginEntry>,
398 by_path: HashMap<String, uuid::Uuid>,
400}
401
402#[derive(Clone)]
406pub struct PluginRegistrySnapshot {
407 by_id: HashMap<uuid::Uuid, PluginEntry>,
408 by_path: HashMap<String, uuid::Uuid>,
409}
410
411impl PluginRegistrySnapshot {
412 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
414 self.by_id.get(&id).map(|e| e.path.as_str())
415 }
416
417 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
419 self.by_path.get(path).copied()
420 }
421
422 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
424 self.by_id.get(&id)
425 }
426
427 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
429 self.by_id.values()
430 }
431
432 pub fn len(&self) -> usize {
434 self.by_id.len()
435 }
436
437 pub fn is_empty(&self) -> bool {
439 self.by_id.is_empty()
440 }
441}
442
443impl PluginRegistry {
444 pub fn new() -> Self {
446 Self::default()
447 }
448
449 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
451 self.by_id.get(&id).map(|e| e.path.as_str())
452 }
453
454 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
456 self.by_path.get(path).copied()
457 }
458
459 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
461 self.by_id.get(&id)
462 }
463
464 pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
466 let entry = PluginEntry { id, path: path.clone(), plugin_type };
467 self.by_id.insert(id, entry);
468 self.by_path.insert(path, id);
469 }
470
471 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
473 self.by_id.values()
474 }
475
476 pub fn len(&self) -> usize {
478 self.by_id.len()
479 }
480
481 pub fn is_empty(&self) -> bool {
483 self.by_id.is_empty()
484 }
485}
486
487struct DynamicHubInner {
492 namespace: String,
494 activations: HashMap<String, Arc<dyn ActivationObject>>,
495 child_routers: HashMap<String, Arc<dyn ChildRouter>>,
497 registry: std::sync::RwLock<PluginRegistry>,
499 pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
500}
501
502#[derive(Clone)]
524pub struct DynamicHub {
525 inner: Arc<DynamicHubInner>,
526}
527
528impl DynamicHub {
533 pub fn new(namespace: impl Into<String>) -> Self {
543 Self {
544 inner: Arc::new(DynamicHubInner {
545 namespace: namespace.into(),
546 activations: HashMap::new(),
547 child_routers: HashMap::new(),
548 registry: std::sync::RwLock::new(PluginRegistry::new()),
549 pending_rpc: std::sync::Mutex::new(Vec::new()),
550 }),
551 }
552 }
553
554 #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
556 pub fn with_namespace(namespace: impl Into<String>) -> Self {
557 Self::new(namespace)
558 }
559
560 pub fn runtime_namespace(&self) -> &str {
562 &self.inner.namespace
563 }
564
565 pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
567 self.inner.registry.read().unwrap()
568 }
569
570 pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
572 let namespace = activation.namespace().to_string();
573 let plugin_id = activation.plugin_id();
574 let activation_for_rpc = activation.clone();
575 let activation_for_router = activation.clone();
576
577 let inner = Arc::get_mut(&mut self.inner)
578 .expect("Cannot register: DynamicHub has multiple references");
579
580 inner.registry.write().unwrap().register(
582 plugin_id,
583 namespace.clone(),
584 namespace.clone(), );
586
587 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
588 inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
589 inner.pending_rpc.lock().unwrap()
590 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
591 self
592 }
593
594 #[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
599 pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
600 let namespace = activation.namespace().to_string();
601 let plugin_id = activation.plugin_id();
602 let activation_for_rpc = activation.clone();
603 let activation_for_router = activation.clone();
604
605 let inner = Arc::get_mut(&mut self.inner)
606 .expect("Cannot register: DynamicHub has multiple references");
607
608 inner.registry.write().unwrap().register(
610 plugin_id,
611 namespace.clone(),
612 namespace.clone(), );
614
615 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
616 inner.child_routers.insert(namespace, Arc::new(activation_for_router));
617 inner.pending_rpc.lock().unwrap()
618 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
619 self
620 }
621
622 pub fn list_methods(&self) -> Vec<String> {
624 let mut methods = Vec::new();
625
626 for m in Activation::methods(self) {
628 methods.push(format!("{}.{}", self.inner.namespace, m));
629 }
630
631 for (ns, act) in &self.inner.activations {
633 for m in act.methods() {
634 methods.push(format!("{}.{}", ns, m));
635 }
636 }
637 methods.sort();
638 methods
639 }
640
641 pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
643 let mut activations = Vec::new();
644
645 activations.push(ActivationInfo {
647 namespace: Activation::namespace(self).to_string(),
648 version: Activation::version(self).to_string(),
649 description: Activation::description(self).to_string(),
650 methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
651 });
652
653 for a in self.inner.activations.values() {
655 activations.push(ActivationInfo {
656 namespace: a.namespace().to_string(),
657 version: a.version().to_string(),
658 description: a.description().to_string(),
659 methods: a.methods().iter().map(|s| s.to_string()).collect(),
660 });
661 }
662
663 activations
664 }
665
666 pub fn compute_hash(&self) -> String {
671 Activation::plugin_schema(self).hash
672 }
673
674 pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
676 self.route_with_ctx(method, params, auth, None).await
677 }
678
679 pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
681 let (namespace, method_name) = self.parse_method(method)?;
682
683 if namespace == self.inner.namespace {
685 return Activation::call(self, method_name, params, auth, raw_ctx).await;
686 }
687
688 let activation = self.inner.activations.get(namespace)
689 .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
690
691 activation.call(method_name, params, auth, raw_ctx).await
692 }
693
694 pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
698 let path = self.inner.registry.read().unwrap()
699 .lookup(handle.plugin_id)
700 .map(|s| s.to_string())
701 .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
702
703 let activation = self.inner.activations.get(&path)
704 .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
705 activation.resolve_handle(handle).await
706 }
707
708 pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
710 self.inner.activations.get(namespace).map(|a| a.schema())
711 }
712
713 pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
715 let guard = self.inner.registry.read().unwrap();
716 PluginRegistrySnapshot {
717 by_id: guard.by_id.clone(),
718 by_path: guard.by_path.clone(),
719 }
720 }
721
722 pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
724 self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
725 }
726
727 pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
729 self.inner.registry.read().unwrap().lookup_by_path(path)
730 }
731
732 pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
734 let mut schemas = Vec::new();
735
736 schemas.push(Activation::plugin_schema(self));
738
739 for a in self.inner.activations.values() {
741 schemas.push(a.plugin_schema());
742 }
743
744 schemas
745 }
746
747 #[deprecated(note = "Use list_plugin_schemas instead")]
749 pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
750 self.list_plugin_schemas()
751 }
752
753 pub fn get_method_help(&self, method: &str) -> Option<String> {
755 let (namespace, method_name) = self.parse_method(method).ok()?;
756 let activation = self.inner.activations.get(namespace)?;
757 activation.method_help(method_name)
758 }
759
760 fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
761 let parts: Vec<&str> = method.splitn(2, '.').collect();
762 if parts.len() != 2 {
763 return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
764 }
765 Ok((parts[0], parts[1]))
766 }
767
768 pub fn plugin_children(&self) -> Vec<ChildSummary> {
771 self.inner.activations.values()
772 .map(|a| {
773 let schema = a.plugin_schema();
774 ChildSummary {
775 namespace: schema.namespace,
776 description: schema.description,
777 hash: schema.hash,
778 }
779 })
780 .collect()
781 }
782
783 pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
785 let mut module = RpcModule::new(());
786
787 PlexusContext::init(self.compute_hash());
788
789 let ns = self.runtime_namespace();
792 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
793 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
794 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
795 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
796 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
797 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
798 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
799 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
800 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
801
802 let plexus_for_call = self.clone();
804 module.register_subscription(
805 call_method,
806 PLEXUS_NOTIF_METHOD,
807 call_unsub,
808 move |params, pending, _ctx, _ext| {
809 let plexus = plexus_for_call.clone();
810 Box::pin(async move {
811 let p: CallParams = params.parse()?;
812 match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
813 Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
814 Err(e) => {
815 let sink = pending.accept().await?;
816 let error_item = super::types::PlexusStreamItem::Error {
817 metadata: super::types::StreamMetadata::new(
818 vec![ns_static.into()],
819 PlexusContext::hash(),
820 ),
821 message: e.to_string(),
822 code: Some(plexus_error_code(&e).to_string()),
823 recoverable: false,
824 };
825 if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
826 let _ = sink.send(raw).await;
827 }
828 Ok(())
829 }
830 }
831 })
832 }
833 )?;
834
835 let plexus_for_hash = self.clone();
837 module.register_subscription(
838 hash_method,
839 PLEXUS_NOTIF_METHOD,
840 hash_unsub,
841 move |_params, pending, _ctx, _ext| {
842 let plexus = plexus_for_hash.clone();
843 Box::pin(async move {
844 let schema = Activation::plugin_schema(&plexus);
845 let stream = async_stream::stream! {
846 yield HashEvent::Hash { value: schema.hash };
847 };
848 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
849 pipe_stream_to_subscription(pending, wrapped).await
850 })
851 }
852 )?;
853
854 let plexus_for_schema = self.clone();
856 module.register_subscription(
857 schema_method,
858 PLEXUS_NOTIF_METHOD,
859 schema_unsub,
860 move |params, pending, _ctx, _ext| {
861 let plexus = plexus_for_schema.clone();
862 Box::pin(async move {
863 let p: SchemaParams = params.parse().unwrap_or_default();
864 let plugin_schema = Activation::plugin_schema(&plexus);
865
866 let result = if let Some(ref name) = p.method {
867 plugin_schema.methods.iter()
868 .find(|m| m.name == *name)
869 .map(|m| super::SchemaResult::Method(m.clone()))
870 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
871 -32602,
872 format!("Method '{}' not found", name),
873 None::<()>,
874 ))?
875 } else {
876 super::SchemaResult::Plugin(plugin_schema)
877 };
878
879 let stream = async_stream::stream! { yield result; };
880 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
881 pipe_stream_to_subscription(pending, wrapped).await
882 })
883 }
884 )?;
885
886 let backend_name = self.runtime_namespace().to_string();
889 module.register_subscription(
890 "_info",
891 PLEXUS_NOTIF_METHOD,
892 "_info_unsub",
893 move |_params, pending, _ctx, _ext| {
894 let name = backend_name.clone();
895 Box::pin(async move {
896 let info_stream = futures::stream::once(async move {
898 serde_json::json!({"backend": name})
899 });
900
901 let wrapped = super::streaming::wrap_stream(
903 info_stream,
904 "_info",
905 vec![]
906 );
907
908 pipe_stream_to_subscription(pending, wrapped).await
910 })
911 }
912 )?;
913
914 let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
916 for factory in pending {
917 module.merge(factory())?;
918 }
919
920 Ok(module)
921 }
922
923 pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
929 let mut module = RpcModule::new(());
930
931 PlexusContext::init(hub.compute_hash());
932
933 let ns = hub.runtime_namespace();
936 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
937 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
938 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
939 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
940 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
941 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
942 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
943 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
944 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
945
946 let hub_for_call = hub.clone();
948 module.register_subscription(
949 call_method,
950 call_method,
951 call_unsub,
952 move |params, pending, _ctx, ext| {
953 let hub = hub_for_call.clone();
954 Box::pin(async move {
955 let p: CallParams = params.parse()?;
956 let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
958 .map(|arc| arc.as_ref());
959 match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
960 Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
961 Err(e) => {
962 let sink = pending.accept().await?;
967 let error_item = super::types::PlexusStreamItem::Error {
968 metadata: super::types::StreamMetadata::new(
969 vec![ns_static.into()],
970 PlexusContext::hash(),
971 ),
972 message: e.to_string(),
973 code: Some(plexus_error_code(&e).to_string()),
974 recoverable: false,
975 };
976 if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
977 let _ = sink.send(raw).await;
978 }
979 Ok(())
980 }
981 }
982 })
983 }
984 )?;
985
986 let hub_for_hash = hub.clone();
988 module.register_subscription(
989 hash_method,
990 PLEXUS_NOTIF_METHOD,
991 hash_unsub,
992 move |_params, pending, _ctx, _ext| {
993 let hub = hub_for_hash.clone();
994 Box::pin(async move {
995 let schema = Activation::plugin_schema(&*hub);
996 let stream = async_stream::stream! {
997 yield HashEvent::Hash { value: schema.hash };
998 };
999 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1000 pipe_stream_to_subscription(pending, wrapped).await
1001 })
1002 }
1003 )?;
1004
1005 let hub_for_schema = hub.clone();
1007 module.register_subscription(
1008 schema_method,
1009 PLEXUS_NOTIF_METHOD,
1010 schema_unsub,
1011 move |params, pending, _ctx, _ext| {
1012 let hub = hub_for_schema.clone();
1013 Box::pin(async move {
1014 let p: SchemaParams = params.parse().unwrap_or_default();
1015 let plugin_schema = Activation::plugin_schema(&*hub);
1016
1017 let result = if let Some(ref name) = p.method {
1018 plugin_schema.methods.iter()
1019 .find(|m| m.name == *name)
1020 .map(|m| super::SchemaResult::Method(m.clone()))
1021 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1022 -32602,
1023 format!("Method '{}' not found", name),
1024 None::<()>,
1025 ))?
1026 } else {
1027 super::SchemaResult::Plugin(plugin_schema)
1028 };
1029
1030 let stream = async_stream::stream! {
1031 yield result;
1032 };
1033 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1034 pipe_stream_to_subscription(pending, wrapped).await
1035 })
1036 }
1037 )?;
1038
1039 let backend_name = hub.runtime_namespace().to_string();
1042 module.register_subscription(
1043 "_info",
1044 PLEXUS_NOTIF_METHOD,
1045 "_info_unsub",
1046 move |_params, pending, _ctx, _ext| {
1047 let name = backend_name.clone();
1048 Box::pin(async move {
1049 let info_stream = futures::stream::once(async move {
1051 serde_json::json!({"backend": name})
1052 });
1053
1054 let wrapped = super::streaming::wrap_stream(
1056 info_stream,
1057 "_info",
1058 vec![]
1059 );
1060
1061 pipe_stream_to_subscription(pending, wrapped).await
1063 })
1064 }
1065 )?;
1066
1067 let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
1070 module.register_async_method(respond_method, |params, _ctx, _ext| async move {
1071 use super::bidirectional::{handle_pending_response, BidirError};
1072
1073 let p: RespondParams = params.parse()?;
1074
1075 tracing::debug!(
1076 request_id = %p.request_id,
1077 "Handling {}.respond via WebSocket",
1078 "plexus"
1079 );
1080
1081 match handle_pending_response(&p.request_id, p.response_data) {
1082 Ok(()) => Ok(serde_json::json!({"success": true})),
1083 Err(BidirError::UnknownRequest) => {
1084 tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1085 Err(jsonrpsee::types::ErrorObject::owned(
1086 -32602,
1087 format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1088 None::<()>,
1089 ))
1090 }
1091 Err(BidirError::ChannelClosed) => {
1092 tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1093 Err(jsonrpsee::types::ErrorObject::owned(
1094 -32000,
1095 "Response channel was closed (request may have timed out)",
1096 None::<()>,
1097 ))
1098 }
1099 Err(e) => {
1100 tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1101 Err(jsonrpsee::types::ErrorObject::owned(
1102 -32000,
1103 format!("Failed to deliver response: {}", e),
1104 None::<()>,
1105 ))
1106 }
1107 }
1108 })?;
1109
1110 let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1112 eprintln!("[TRACE] arc_into_rpc_module: merging {} activation RPC factories", pending.len());
1113 for (idx, factory) in pending.into_iter().enumerate() {
1114 eprintln!("[TRACE] arc_into_rpc_module: calling factory {} to get Methods", idx);
1115 let methods = factory();
1116 eprintln!("[TRACE] arc_into_rpc_module: factory {} returned Methods with {} methods", idx, methods.method_names().count());
1117 eprintln!("[TRACE] arc_into_rpc_module: merging factory {} methods into module", idx);
1118 module.merge(methods)?;
1119 eprintln!("[TRACE] arc_into_rpc_module: successfully merged factory {} methods", idx);
1120 }
1121 eprintln!("[TRACE] arc_into_rpc_module: all activations merged successfully");
1122
1123 Ok(module)
1124 }
1125}
1126
1127#[derive(Debug, serde::Deserialize)]
1129struct CallParams {
1130 method: String,
1131 #[serde(default)]
1132 params: Option<Value>,
1133}
1134
1135#[derive(Debug, Default, serde::Deserialize)]
1137struct SchemaParams {
1138 method: Option<String>,
1139}
1140
1141#[derive(Debug, serde::Deserialize)]
1143struct RespondParams {
1144 request_id: String,
1145 response_data: Value,
1146}
1147
1148async fn pipe_stream_to_subscription(
1153 pending: jsonrpsee::PendingSubscriptionSink,
1154 mut stream: PlexusStream,
1155) -> jsonrpsee::core::SubscriptionResult {
1156 use futures::StreamExt;
1157
1158 let sink = pending.accept().await?;
1159 while let Some(item) = stream.next().await {
1160 let json = serde_json::value::to_raw_value(&item)?;
1161 sink.send(json).await?;
1162 }
1163 Ok(())
1164}
1165
1166#[plexus_macros::activation(
1171 namespace = "plexus",
1172 version = "1.0.0",
1173 description = "Central routing and introspection",
1174 hub,
1175 namespace_fn = "runtime_namespace"
1176)]
1177impl DynamicHub {
1178 #[plexus_macros::method(
1180 streaming,
1181 description = "Route a call to a registered activation",
1182 params(
1183 method = "The method to call (format: namespace.method)",
1184 params = "Parameters to pass to the method (optional, defaults to {})"
1185 )
1186 )]
1187 async fn call(
1188 &self,
1189 method: String,
1190 params: Option<Value>,
1191 ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1192 use super::context::PlexusContext;
1193 use super::types::{PlexusStreamItem, StreamMetadata};
1194
1195 let result = self.route(&method, params.unwrap_or_default(), None).await;
1196
1197 match result {
1198 Ok(plexus_stream) => {
1199 plexus_stream
1201 }
1202 Err(e) => {
1203 let metadata = StreamMetadata::new(
1205 vec![self.inner.namespace.clone()],
1206 PlexusContext::hash(),
1207 );
1208 Box::pin(futures::stream::once(async move {
1209 PlexusStreamItem::Error {
1210 metadata,
1211 message: e.to_string(),
1212 code: None,
1213 recoverable: false,
1214 }
1215 }))
1216 }
1217 }
1218 }
1219
1220 #[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1225 async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1226 let schema = Activation::plugin_schema(self);
1227 stream! { yield HashEvent::Hash { value: schema.hash }; }
1228 }
1229
1230 #[plexus_macros::method(description = "Get plugin hashes for cache validation")]
1232 async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1233 let schema = Activation::plugin_schema(self);
1234
1235 stream! {
1236 yield PluginHashes {
1237 namespace: schema.namespace.clone(),
1238 self_hash: schema.self_hash.clone(),
1239 children_hash: schema.children_hash.clone(),
1240 hash: schema.hash.clone(),
1241 children: schema.children.as_ref().map(|kids| {
1242 kids.iter()
1243 .map(|c| ChildHashes {
1244 namespace: c.namespace.clone(),
1245 hash: c.hash.clone(),
1246 })
1247 .collect()
1248 }),
1249 };
1250 }
1251 }
1252
1253 }
1255
1256use super::hub_context::HubContext;
1261use std::sync::Weak;
1262
1263#[async_trait]
1269impl HubContext for Weak<DynamicHub> {
1270 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1271 let hub = self.upgrade().ok_or_else(|| {
1272 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1273 })?;
1274 hub.do_resolve_handle(handle).await
1275 }
1276
1277 async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1278 let hub = self.upgrade().ok_or_else(|| {
1279 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1280 })?;
1281 hub.route(method, params, None).await
1282 }
1283
1284 fn is_valid(&self) -> bool {
1285 self.upgrade().is_some()
1286 }
1287}
1288
1289#[async_trait]
1294impl ChildRouter for DynamicHub {
1295 fn router_namespace(&self) -> &str {
1296 &self.inner.namespace
1297 }
1298
1299 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1300 self.route_with_ctx(method, params, auth, raw_ctx).await
1303 }
1304
1305 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1306 self.inner.child_routers.get(name)
1308 .map(|router| {
1309 Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1311 })
1312 }
1313}
1314
1315#[cfg(test)]
1316mod tests {
1317 use super::*;
1318
1319 #[test]
1320 fn dynamic_hub_implements_activation() {
1321 fn assert_activation<T: Activation>() {}
1322 assert_activation::<DynamicHub>();
1323 }
1324
1325 #[test]
1326 fn dynamic_hub_methods() {
1327 let hub = DynamicHub::new("test");
1328 let methods = hub.methods();
1329 assert!(methods.contains(&"call"));
1330 assert!(methods.contains(&"hash"));
1331 assert!(methods.contains(&"schema"));
1332 }
1334
1335 #[test]
1336 fn dynamic_hub_hash_stable() {
1337 let h1 = DynamicHub::new("test");
1338 let h2 = DynamicHub::new("test");
1339 assert_eq!(h1.compute_hash(), h2.compute_hash());
1340 }
1341
1342 #[test]
1343 fn dynamic_hub_is_hub() {
1344 use crate::activations::health::Health;
1345 let hub = DynamicHub::new("test").register(Health::new());
1346 let schema = hub.plugin_schema();
1347
1348 assert!(schema.is_hub(), "dynamic hub should be a hub");
1350 assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1351
1352 let children = schema.children.expect("dynamic hub should have children");
1354 assert!(!children.is_empty(), "dynamic hub should have at least one child");
1355
1356 let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1358 assert!(!health.hash.is_empty(), "health should have a hash");
1359 }
1360
1361 #[test]
1362 fn dynamic_hub_schema_structure() {
1363 use crate::activations::health::Health;
1364 let hub = DynamicHub::new("test").register(Health::new());
1365 let schema = hub.plugin_schema();
1366
1367 let json = serde_json::to_string_pretty(&schema).unwrap();
1369 println!("DynamicHub schema:\n{}", json);
1370
1371 assert_eq!(schema.namespace, "test");
1373 assert!(schema.methods.iter().any(|m| m.name == "call"));
1374 assert!(schema.children.is_some());
1375 }
1376
1377 #[tokio::test]
1382 async fn invariant_resolve_handle_unknown_activation() {
1383 use crate::activations::health::Health;
1384 use crate::types::Handle;
1385 use uuid::Uuid;
1386
1387 let hub = DynamicHub::new("test").register(Health::new());
1388
1389 let unknown_plugin_id = Uuid::new_v4();
1391 let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1392
1393 let result = hub.do_resolve_handle(&handle).await;
1394
1395 match result {
1396 Err(PlexusError::ActivationNotFound(_)) => {
1397 }
1399 Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1400 Ok(_) => panic!("Expected error for unknown activation"),
1401 }
1402 }
1403
1404 #[tokio::test]
1405 async fn invariant_resolve_handle_unsupported() {
1406 use crate::activations::health::Health;
1407 use crate::types::Handle;
1408
1409 let hub = DynamicHub::new("test").register(Health::new());
1410
1411 let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1413
1414 let result = hub.do_resolve_handle(&handle).await;
1415
1416 match result {
1417 Err(PlexusError::HandleNotSupported(name)) => {
1418 assert_eq!(name, "health");
1419 }
1420 Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1421 Ok(_) => panic!("Expected error for unsupported handle"),
1422 }
1423 }
1424
1425 #[tokio::test]
1426 async fn invariant_resolve_handle_routes_by_plugin_id() {
1427 use crate::activations::health::Health;
1428 use crate::activations::echo::Echo;
1429 use crate::types::Handle;
1430 use uuid::Uuid;
1431
1432 let health = Health::new();
1433 let echo = Echo::new();
1434 let health_plugin_id = health.plugin_id();
1435 let echo_plugin_id = echo.plugin_id();
1436
1437 let hub = DynamicHub::new("test")
1438 .register(health)
1439 .register(echo);
1440
1441 let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1443 match hub.do_resolve_handle(&health_handle).await {
1444 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1445 Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1446 Ok(_) => panic!("health handle should return HandleNotSupported"),
1447 }
1448
1449 let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1451 match hub.do_resolve_handle(&echo_handle).await {
1452 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1453 Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1454 Ok(_) => panic!("echo handle should return HandleNotSupported"),
1455 }
1456
1457 let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1459 match hub.do_resolve_handle(&unknown_handle).await {
1460 Err(PlexusError::ActivationNotFound(_)) => { },
1461 Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1462 Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1463 }
1464 }
1465
1466 #[test]
1467 fn invariant_handle_plugin_id_determines_routing() {
1468 use crate::activations::health::Health;
1469 use crate::activations::echo::Echo;
1470 use crate::types::Handle;
1471
1472 let health = Health::new();
1473 let echo = Echo::new();
1474
1475 let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1477 .with_meta(vec!["msg-123".into(), "user".into()]);
1478 let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1479 .with_meta(vec!["msg-123".into(), "user".into()]);
1480
1481 assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1483 }
1484
1485 #[test]
1490 fn plugin_registry_basic_operations() {
1491 let mut registry = PluginRegistry::new();
1492 let id = uuid::Uuid::new_v4();
1493
1494 registry.register(id, "test_plugin".to_string(), "test".to_string());
1496
1497 assert_eq!(registry.lookup(id), Some("test_plugin"));
1499
1500 assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1502
1503 let entry = registry.get(id).expect("should have entry");
1505 assert_eq!(entry.path, "test_plugin");
1506 assert_eq!(entry.plugin_type, "test");
1507 }
1508
1509 #[test]
1510 fn plugin_registry_populated_on_register() {
1511 use crate::activations::health::Health;
1512
1513 let hub = DynamicHub::new("test").register(Health::new());
1514
1515 let registry = hub.registry();
1516 assert!(!registry.is_empty(), "registry should not be empty after registration");
1517
1518 let health_id = registry.lookup_by_path("health");
1520 assert!(health_id.is_some(), "health should be registered by path");
1521
1522 let health_uuid = health_id.unwrap();
1524 assert_eq!(registry.lookup(health_uuid), Some("health"));
1525 }
1526
1527 #[test]
1528 fn plugin_registry_deterministic_uuid() {
1529 use crate::activations::health::Health;
1530
1531 let health1 = Health::new();
1533 let health2 = Health::new();
1534
1535 assert_eq!(health1.plugin_id(), health2.plugin_id(),
1536 "same activation type should have deterministic UUID");
1537
1538 let expected = uuid::Uuid::new_v5(
1540 &uuid::Uuid::NAMESPACE_OID,
1541 b"health@1"
1542 );
1543 assert_eq!(health1.plugin_id(), expected,
1544 "plugin_id should be deterministic from namespace@major_version");
1545 }
1546}