Skip to main content

plexus_core/plexus/
plexus.rs

1//! DynamicHub - the central routing layer for activations
2//!
3//! DynamicHub IS an activation that also serves as the registry for other activations.
4//! It implements the Plexus RPC protocol for routing and introspection.
5//! It uses hub-macro for its methods, with the `call` method using the streaming
6//! pattern to forward responses from routed methods.
7
8use super::{
9    context::PlexusContext,
10    method_enum::MethodEnumSchema,
11    schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12    streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use futures::Stream;
18use jsonrpsee::core::server::Methods;
19use jsonrpsee::SubscriptionMessage;
20use jsonrpsee::RpcModule;
21use schemars::JsonSchema;
22use serde::{Deserialize, Serialize};
23use serde_json::Value;
24use std::collections::HashMap;
25use std::sync::Arc;
26
27// ============================================================================
28// Error Types
29// ============================================================================
30
31#[derive(Debug, Clone)]
32pub enum PlexusError {
33    ActivationNotFound(String),
34    MethodNotFound { activation: String, method: String },
35    InvalidParams(String),
36    ExecutionError(String),
37    HandleNotSupported(String),
38}
39
40impl std::fmt::Display for PlexusError {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
44            PlexusError::MethodNotFound { activation, method } => {
45                write!(f, "Method not found: {}.{}", activation, method)
46            }
47            PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
48            PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
49            PlexusError::HandleNotSupported(activation) => {
50                write!(f, "Handle resolution not supported by activation: {}", activation)
51            }
52        }
53    }
54}
55
56impl std::error::Error for PlexusError {}
57
58// ============================================================================
59// Schema Types
60// ============================================================================
61
62#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
63pub struct ActivationInfo {
64    pub namespace: String,
65    pub version: String,
66    pub description: String,
67    pub methods: Vec<String>,
68}
69
70// ============================================================================
71// Activation Trait
72// ============================================================================
73
74#[async_trait]
75pub trait Activation: Send + Sync + 'static {
76    type Methods: MethodEnumSchema;
77
78    fn namespace(&self) -> &str;
79    fn version(&self) -> &str;
80    /// Short description (max 15 words)
81    fn description(&self) -> &str { "No description available" }
82    /// Long description (optional, for detailed documentation)
83    fn long_description(&self) -> Option<&str> { None }
84    fn methods(&self) -> Vec<&str>;
85    fn method_help(&self, _method: &str) -> Option<String> { None }
86    /// Stable activation instance ID for handle routing
87    /// By default generates a deterministic UUID from namespace+major_version
88    /// Using major version only ensures handles survive minor/patch upgrades (semver)
89    fn plugin_id(&self) -> uuid::Uuid {
90        let major_version = self.version().split('.').next().unwrap_or("0");
91        uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
92    }
93
94    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
95    async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
96        Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
97    }
98
99    fn into_rpc_methods(self) -> Methods where Self: Sized;
100
101    /// Return this activation's schema (methods + optional children)
102    fn plugin_schema(&self) -> PluginSchema {
103        use std::collections::hash_map::DefaultHasher;
104        use std::hash::{Hash, Hasher};
105
106        let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
107            let desc = self.method_help(name).unwrap_or_default();
108            // Compute a simple hash for methods not using hub-macro
109            let mut hasher = DefaultHasher::new();
110            name.hash(&mut hasher);
111            desc.hash(&mut hasher);
112            let hash = format!("{:016x}", hasher.finish());
113            MethodSchema::new(name.to_string(), desc, hash)
114        }).collect();
115
116        if let Some(long_desc) = self.long_description() {
117            PluginSchema::leaf_with_long_description(
118                self.namespace(),
119                self.version(),
120                self.description(),
121                long_desc,
122                methods,
123            )
124        } else {
125            PluginSchema::leaf(
126                self.namespace(),
127                self.version(),
128                self.description(),
129                methods,
130            )
131        }
132    }
133}
134
135// ============================================================================
136// Child Routing for Hub Plugins
137// ============================================================================
138
139/// Trait for activations that can route to child activations
140///
141/// Hub activations implement this to support nested method routing.
142/// When a method like "mercury.info" is called on a solar activation,
143/// this trait enables routing to the mercury child.
144///
145/// This trait is separate from Activation to avoid associated type issues
146/// with dynamic dispatch.
147#[async_trait]
148pub trait ChildRouter: Send + Sync {
149    /// Get the namespace of this router (for error messages)
150    fn router_namespace(&self) -> &str;
151
152    /// Call a method on this router
153    async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
154
155    /// Get a child activation instance by name for nested routing
156    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
157}
158
159/// Route a method call to a child activation
160///
161/// This is called from generated code when a hub activation receives
162/// a method that doesn't match its local methods. If the method
163/// contains a dot (e.g., "mercury.info"), it routes to the child.
164pub async fn route_to_child<T: ChildRouter + ?Sized>(
165    parent: &T,
166    method: &str,
167    params: Value,
168) -> Result<PlexusStream, PlexusError> {
169    // Try to split on first dot for nested routing
170    if let Some((child_name, rest)) = method.split_once('.') {
171        if let Some(child) = parent.get_child(child_name).await {
172            return child.router_call(rest, params).await;
173        }
174        return Err(PlexusError::ActivationNotFound(child_name.to_string()));
175    }
176
177    // No dot - method simply not found
178    Err(PlexusError::MethodNotFound {
179        activation: parent.router_namespace().to_string(),
180        method: method.to_string(),
181    })
182}
183
184/// Wrapper to implement ChildRouter for Arc<dyn ChildRouter>
185///
186/// This allows DynamicHub to return its stored Arc<dyn ChildRouter> from get_child()
187struct ArcChildRouter(Arc<dyn ChildRouter>);
188
189#[async_trait]
190impl ChildRouter for ArcChildRouter {
191    fn router_namespace(&self) -> &str {
192        self.0.router_namespace()
193    }
194
195    async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
196        self.0.router_call(method, params).await
197    }
198
199    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
200        self.0.get_child(name).await
201    }
202}
203
204// ============================================================================
205// Internal Type-Erased Activation
206// ============================================================================
207
208#[async_trait]
209#[allow(dead_code)] // Methods exist for completeness but some aren't called post-erasure yet
210trait ActivationObject: Send + Sync + 'static {
211    fn namespace(&self) -> &str;
212    fn version(&self) -> &str;
213    fn description(&self) -> &str;
214    fn long_description(&self) -> Option<&str>;
215    fn methods(&self) -> Vec<&str>;
216    fn method_help(&self, method: &str) -> Option<String>;
217    fn plugin_id(&self) -> uuid::Uuid;
218    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
219    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
220    fn plugin_schema(&self) -> PluginSchema;
221    fn schema(&self) -> Schema;
222}
223
224struct ActivationWrapper<A: Activation> {
225    inner: A,
226}
227
228#[async_trait]
229impl<A: Activation> ActivationObject for ActivationWrapper<A> {
230    fn namespace(&self) -> &str { self.inner.namespace() }
231    fn version(&self) -> &str { self.inner.version() }
232    fn description(&self) -> &str { self.inner.description() }
233    fn long_description(&self) -> Option<&str> { self.inner.long_description() }
234    fn methods(&self) -> Vec<&str> { self.inner.methods() }
235    fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
236    fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
237
238    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
239        self.inner.call(method, params).await
240    }
241
242    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
243        self.inner.resolve_handle(handle).await
244    }
245
246    fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
247
248    fn schema(&self) -> Schema {
249        let schema = schemars::schema_for!(A::Methods);
250        serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
251            .expect("parse schema")
252    }
253}
254
255// ============================================================================
256// Plexus Event Types
257// ============================================================================
258
259#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
260#[serde(tag = "event", rename_all = "snake_case")]
261pub enum HashEvent {
262    Hash { value: String },
263}
264
265/// Event for schema() RPC method - returns plugin schema
266#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
267#[serde(tag = "event", rename_all = "snake_case")]
268pub enum SchemaEvent {
269    /// This plugin's schema
270    Schema(PluginSchema),
271}
272
273
274// ============================================================================
275// Activation Registry
276// ============================================================================
277
278/// Entry in the activation registry
279#[derive(Debug, Clone)]
280pub struct PluginEntry {
281    /// Stable activation instance ID
282    pub id: uuid::Uuid,
283    /// Current path/namespace for this activation
284    pub path: String,
285    /// Activation type (e.g., "cone", "bash", "arbor")
286    pub plugin_type: String,
287}
288
289/// Registry mapping activation UUIDs to their current paths
290///
291/// This enables handle routing without path dependency - handles reference
292/// activations by their stable UUID, and the registry maps to the current path.
293#[derive(Default)]
294pub struct PluginRegistry {
295    /// Lookup by plugin UUID
296    by_id: HashMap<uuid::Uuid, PluginEntry>,
297    /// Lookup by current path (for reverse lookup)
298    by_path: HashMap<String, uuid::Uuid>,
299}
300
301/// Read-only snapshot of the activation registry
302///
303/// Safe to use outside of DynamicHub locks.
304#[derive(Clone)]
305pub struct PluginRegistrySnapshot {
306    by_id: HashMap<uuid::Uuid, PluginEntry>,
307    by_path: HashMap<String, uuid::Uuid>,
308}
309
310impl PluginRegistrySnapshot {
311    /// Look up an activation's path by its UUID
312    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
313        self.by_id.get(&id).map(|e| e.path.as_str())
314    }
315
316    /// Look up an activation's UUID by its path
317    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
318        self.by_path.get(path).copied()
319    }
320
321    /// Get an activation entry by its UUID
322    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
323        self.by_id.get(&id)
324    }
325
326    /// List all registered activations
327    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
328        self.by_id.values()
329    }
330
331    /// Get the number of registered plugins
332    pub fn len(&self) -> usize {
333        self.by_id.len()
334    }
335
336    /// Check if the registry is empty
337    pub fn is_empty(&self) -> bool {
338        self.by_id.is_empty()
339    }
340}
341
342impl PluginRegistry {
343    /// Create a new empty registry
344    pub fn new() -> Self {
345        Self::default()
346    }
347
348    /// Look up an activation's path by its UUID
349    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
350        self.by_id.get(&id).map(|e| e.path.as_str())
351    }
352
353    /// Look up an activation's UUID by its path
354    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
355        self.by_path.get(path).copied()
356    }
357
358    /// Get an activation entry by its UUID
359    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
360        self.by_id.get(&id)
361    }
362
363    /// Register an activation
364    pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
365        let entry = PluginEntry { id, path: path.clone(), plugin_type };
366        self.by_id.insert(id, entry);
367        self.by_path.insert(path, id);
368    }
369
370    /// List all registered activations
371    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
372        self.by_id.values()
373    }
374
375    /// Get the number of registered plugins
376    pub fn len(&self) -> usize {
377        self.by_id.len()
378    }
379
380    /// Check if the registry is empty
381    pub fn is_empty(&self) -> bool {
382        self.by_id.is_empty()
383    }
384}
385
386// ============================================================================
387// DynamicHub (formerly Plexus)
388// ============================================================================
389
390struct DynamicHubInner {
391    /// Custom namespace for this hub instance (defaults to "plexus")
392    namespace: String,
393    activations: HashMap<String, Arc<dyn ActivationObject>>,
394    /// Child routers for direct nested routing (e.g., hub.solar.mercury.info)
395    child_routers: HashMap<String, Arc<dyn ChildRouter>>,
396    /// Activation registry mapping UUIDs to paths
397    registry: std::sync::RwLock<PluginRegistry>,
398    pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
399}
400
401/// DynamicHub - an activation that routes to dynamically registered child activations
402///
403/// Unlike hub activations with hardcoded children (like Solar),
404/// DynamicHub allows registering activations at runtime via `.register()`.
405///
406/// # Direct Hosting
407///
408/// For a single activation, host it directly:
409/// ```ignore
410/// let solar = Arc::new(Solar::new());
411/// TransportServer::builder(solar, converter).serve().await?;
412/// ```
413///
414/// # Composition
415///
416/// For multiple top-level activations, use DynamicHub:
417/// ```ignore
418/// let hub = DynamicHub::with_namespace("myapp")
419///     .register(Solar::new())
420///     .register(Echo::new());
421/// ```
422#[derive(Clone)]
423pub struct DynamicHub {
424    inner: Arc<DynamicHubInner>,
425}
426
427// ============================================================================
428// DynamicHub Infrastructure (non-RPC methods)
429// ============================================================================
430
431impl DynamicHub {
432    /// Create a new DynamicHub with explicit namespace
433    ///
434    /// Unlike single activations which have fixed namespaces, DynamicHub is a
435    /// composition tool that can be named based on your application. Common choices:
436    /// - "hub" - generic default
437    /// - "substrate" - for substrate server
438    /// - "myapp" - for your application name
439    ///
440    /// The namespace appears in method calls: `{namespace}.call`, `{namespace}.schema`
441    pub fn new(namespace: impl Into<String>) -> Self {
442        Self {
443            inner: Arc::new(DynamicHubInner {
444                namespace: namespace.into(),
445                activations: HashMap::new(),
446                child_routers: HashMap::new(),
447                registry: std::sync::RwLock::new(PluginRegistry::new()),
448                pending_rpc: std::sync::Mutex::new(Vec::new()),
449            }),
450        }
451    }
452
453    /// Deprecated: Use new() with explicit namespace instead
454    #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
455    pub fn with_namespace(namespace: impl Into<String>) -> Self {
456        Self::new(namespace)
457    }
458
459    /// Get the runtime namespace for this DynamicHub instance
460    pub fn runtime_namespace(&self) -> &str {
461        &self.inner.namespace
462    }
463
464    /// Get access to the activation registry
465    pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
466        self.inner.registry.read().unwrap()
467    }
468
469    /// Register an activation
470    pub fn register<A: Activation + Clone>(mut self, activation: A) -> Self {
471        let namespace = activation.namespace().to_string();
472        let plugin_id = activation.plugin_id();
473        let activation_for_rpc = activation.clone();
474
475        let inner = Arc::get_mut(&mut self.inner)
476            .expect("Cannot register: DynamicHub has multiple references");
477
478        // Register in the activation registry
479        inner.registry.write().unwrap().register(
480            plugin_id,
481            namespace.clone(),
482            namespace.clone(), // Use namespace as plugin_type for now
483        );
484
485        inner.activations.insert(namespace, Arc::new(ActivationWrapper { inner: activation }));
486        inner.pending_rpc.lock().unwrap()
487            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
488        self
489    }
490
491    /// Register a hub activation that supports nested routing
492    ///
493    /// Hub activations implement `ChildRouter`, enabling direct nested method calls
494    /// like `hub.solar.mercury.info` at the RPC layer (no hub.call indirection).
495    pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
496        let namespace = activation.namespace().to_string();
497        let plugin_id = activation.plugin_id();
498        let activation_for_rpc = activation.clone();
499        let activation_for_router = activation.clone();
500
501        let inner = Arc::get_mut(&mut self.inner)
502            .expect("Cannot register: DynamicHub has multiple references");
503
504        // Register in the activation registry
505        inner.registry.write().unwrap().register(
506            plugin_id,
507            namespace.clone(),
508            namespace.clone(), // Use namespace as plugin_type for now
509        );
510
511        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
512        inner.child_routers.insert(namespace, Arc::new(activation_for_router));
513        inner.pending_rpc.lock().unwrap()
514            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
515        self
516    }
517
518    /// List all methods across all activations
519    pub fn list_methods(&self) -> Vec<String> {
520        let mut methods = Vec::new();
521
522        // Include hub's own methods
523        for m in Activation::methods(self) {
524            methods.push(format!("{}.{}", self.inner.namespace, m));
525        }
526
527        // Include registered activation methods
528        for (ns, act) in &self.inner.activations {
529            for m in act.methods() {
530                methods.push(format!("{}.{}", ns, m));
531            }
532        }
533        methods.sort();
534        methods
535    }
536
537    /// List all activations (including this hub itself)
538    pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
539        let mut activations = Vec::new();
540
541        // Include this hub itself
542        activations.push(ActivationInfo {
543            namespace: Activation::namespace(self).to_string(),
544            version: Activation::version(self).to_string(),
545            description: Activation::description(self).to_string(),
546            methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
547        });
548
549        // Include registered activations
550        for a in self.inner.activations.values() {
551            activations.push(ActivationInfo {
552                namespace: a.namespace().to_string(),
553                version: a.version().to_string(),
554                description: a.description().to_string(),
555                methods: a.methods().iter().map(|s| s.to_string()).collect(),
556            });
557        }
558
559        activations
560    }
561
562    /// Compute hash for cache invalidation
563    ///
564    /// Returns the hash from the recursive plugin schema. This hash changes
565    /// whenever any method definition or child plugin changes.
566    pub fn compute_hash(&self) -> String {
567        Activation::plugin_schema(self).hash
568    }
569
570    /// Route a call to the appropriate activation
571    pub async fn route(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
572        let (namespace, method_name) = self.parse_method(method)?;
573
574        // Handle plexus's own methods
575        if namespace == self.inner.namespace {
576            return Activation::call(self, method_name, params).await;
577        }
578
579        let activation = self.inner.activations.get(namespace)
580            .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
581
582        activation.call(method_name, params).await
583    }
584
585    /// Resolve a handle using the activation registry
586    ///
587    /// Looks up the activation by its UUID in the registry.
588    pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
589        let path = self.inner.registry.read().unwrap()
590            .lookup(handle.plugin_id)
591            .map(|s| s.to_string())
592            .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
593
594        let activation = self.inner.activations.get(&path)
595            .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
596        activation.resolve_handle(handle).await
597    }
598
599    /// Get activation schema
600    pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
601        self.inner.activations.get(namespace).map(|a| a.schema())
602    }
603
604    /// Get a snapshot of the activation registry (safe to use outside locks)
605    pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
606        let guard = self.inner.registry.read().unwrap();
607        PluginRegistrySnapshot {
608            by_id: guard.by_id.clone(),
609            by_path: guard.by_path.clone(),
610        }
611    }
612
613    /// Look up an activation path by its UUID
614    pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
615        self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
616    }
617
618    /// Look up an activation UUID by its path
619    pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
620        self.inner.registry.read().unwrap().lookup_by_path(path)
621    }
622
623    /// Get activation schemas for all activations (including this hub itself)
624    pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
625        let mut schemas = Vec::new();
626
627        // Include this hub itself
628        schemas.push(Activation::plugin_schema(self));
629
630        // Include registered activations
631        for a in self.inner.activations.values() {
632            schemas.push(a.plugin_schema());
633        }
634
635        schemas
636    }
637
638    /// Deprecated: use list_plugin_schemas instead
639    #[deprecated(note = "Use list_plugin_schemas instead")]
640    pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
641        self.list_plugin_schemas()
642    }
643
644    /// Get help for a method
645    pub fn get_method_help(&self, method: &str) -> Option<String> {
646        let (namespace, method_name) = self.parse_method(method).ok()?;
647        let activation = self.inner.activations.get(namespace)?;
648        activation.method_help(method_name)
649    }
650
651    fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
652        let parts: Vec<&str> = method.splitn(2, '.').collect();
653        if parts.len() != 2 {
654            return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
655        }
656        Ok((parts[0], parts[1]))
657    }
658
659    /// Get child activation summaries (for hub functionality)
660    /// Called by hub-macro when `hub` flag is set
661    pub fn plugin_children(&self) -> Vec<ChildSummary> {
662        self.inner.activations.values()
663            .map(|a| {
664                let schema = a.plugin_schema();
665                ChildSummary {
666                    namespace: schema.namespace,
667                    description: schema.description,
668                    hash: schema.hash,
669                }
670            })
671            .collect()
672    }
673
674    /// Convert to RPC module
675    pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
676        let mut module = RpcModule::new(());
677
678        PlexusContext::init(self.compute_hash());
679
680        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
681        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
682        let ns = self.runtime_namespace();
683        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
684        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
685        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
686        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
687        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
688        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
689        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
690        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
691        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
692
693        // Register {ns}.call subscription
694        let plexus_for_call = self.clone();
695        module.register_subscription(
696            call_method,
697            call_method,
698            call_unsub,
699            move |params, pending, _ctx, _ext| {
700                let plexus = plexus_for_call.clone();
701                Box::pin(async move {
702                    // Parse params: {"method": "...", "params": {...}}
703                    let p: CallParams = params.parse()?;
704                    let stream = plexus.route(&p.method, p.params.unwrap_or_default()).await
705                        .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
706                    pipe_stream_to_subscription(pending, stream).await
707                })
708            }
709        )?;
710
711        // Register {ns}.hash subscription
712        let plexus_for_hash = self.clone();
713        module.register_subscription(
714            hash_method,
715            hash_method,
716            hash_unsub,
717            move |_params, pending, _ctx, _ext| {
718                let plexus = plexus_for_hash.clone();
719                Box::pin(async move {
720                    let schema = Activation::plugin_schema(&plexus);
721                    let stream = async_stream::stream! {
722                        yield HashEvent::Hash { value: schema.hash };
723                    };
724                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
725                    pipe_stream_to_subscription(pending, wrapped).await
726                })
727            }
728        )?;
729
730        // Register {ns}.schema subscription
731        let plexus_for_schema = self.clone();
732        module.register_subscription(
733            schema_method,
734            schema_method,
735            schema_unsub,
736            move |params, pending, _ctx, _ext| {
737                let plexus = plexus_for_schema.clone();
738                Box::pin(async move {
739                    let p: SchemaParams = params.parse().unwrap_or_default();
740                    let plugin_schema = Activation::plugin_schema(&plexus);
741
742                    let result = if let Some(ref name) = p.method {
743                        plugin_schema.methods.iter()
744                            .find(|m| m.name == *name)
745                            .map(|m| super::SchemaResult::Method(m.clone()))
746                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
747                                -32602,
748                                format!("Method '{}' not found", name),
749                                None::<()>,
750                            ))?
751                    } else {
752                        super::SchemaResult::Plugin(plugin_schema)
753                    };
754
755                    let stream = async_stream::stream! { yield result; };
756                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
757                    pipe_stream_to_subscription(pending, wrapped).await
758                })
759            }
760        )?;
761
762        // Register _info well-known endpoint (no namespace prefix)
763        // Returns backend name as a single-item stream with automatic Done event
764        let backend_name = self.runtime_namespace().to_string();
765        module.register_subscription(
766            "_info",
767            "_info",
768            "_info_unsub",
769            move |_params, pending, _ctx, _ext| {
770                let name = backend_name.clone();
771                Box::pin(async move {
772                    // Create a single-item stream with the info response
773                    let info_stream = futures::stream::once(async move {
774                        serde_json::json!({"backend": name})
775                    });
776
777                    // Wrap to auto-append Done event
778                    let wrapped = super::streaming::wrap_stream(
779                        info_stream,
780                        "_info",
781                        vec![]
782                    );
783
784                    // Pipe to subscription (handles Done automatically)
785                    pipe_stream_to_subscription(pending, wrapped).await
786                })
787            }
788        )?;
789
790        // Add all registered activation RPC methods
791        let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
792        for factory in pending {
793            module.merge(factory())?;
794        }
795
796        Ok(module)
797    }
798
799    /// Convert Arc<DynamicHub> to RPC module while keeping the Arc alive
800    ///
801    /// Unlike `into_rpc_module`, this keeps the Arc<DynamicHub> reference alive,
802    /// which is necessary when activations hold Weak<DynamicHub> references that
803    /// need to remain upgradeable.
804    pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
805        let mut module = RpcModule::new(());
806
807        PlexusContext::init(hub.compute_hash());
808
809        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
810        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
811        let ns = hub.runtime_namespace();
812        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
813        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
814        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
815        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
816        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
817        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
818        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
819        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
820        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
821
822        // Register {ns}.call subscription - clone Arc to keep reference alive
823        let hub_for_call = hub.clone();
824        module.register_subscription(
825            call_method,
826            call_method,
827            call_unsub,
828            move |params, pending, _ctx, _ext| {
829                let hub = hub_for_call.clone();
830                Box::pin(async move {
831                    let p: CallParams = params.parse()?;
832                    let stream = hub.route(&p.method, p.params.unwrap_or_default()).await
833                        .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
834                    pipe_stream_to_subscription(pending, stream).await
835                })
836            }
837        )?;
838
839        // Register {ns}.hash subscription
840        let hub_for_hash = hub.clone();
841        module.register_subscription(
842            hash_method,
843            hash_method,
844            hash_unsub,
845            move |_params, pending, _ctx, _ext| {
846                let hub = hub_for_hash.clone();
847                Box::pin(async move {
848                    let schema = Activation::plugin_schema(&*hub);
849                    let stream = async_stream::stream! {
850                        yield HashEvent::Hash { value: schema.hash };
851                    };
852                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
853                    pipe_stream_to_subscription(pending, wrapped).await
854                })
855            }
856        )?;
857
858        // Register {ns}.schema subscription
859        let hub_for_schema = hub.clone();
860        module.register_subscription(
861            schema_method,
862            schema_method,
863            schema_unsub,
864            move |params, pending, _ctx, _ext| {
865                let hub = hub_for_schema.clone();
866                Box::pin(async move {
867                    let p: SchemaParams = params.parse().unwrap_or_default();
868                    let plugin_schema = Activation::plugin_schema(&*hub);
869
870                    let result = if let Some(ref name) = p.method {
871                        plugin_schema.methods.iter()
872                            .find(|m| m.name == *name)
873                            .map(|m| super::SchemaResult::Method(m.clone()))
874                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
875                                -32602,
876                                format!("Method '{}' not found", name),
877                                None::<()>,
878                            ))?
879                    } else {
880                        super::SchemaResult::Plugin(plugin_schema)
881                    };
882
883                    let stream = async_stream::stream! {
884                        yield result;
885                    };
886                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
887                    pipe_stream_to_subscription(pending, wrapped).await
888                })
889            }
890        )?;
891
892        // Register _info well-known endpoint (no namespace prefix)
893        // Returns backend name as a single-item stream with automatic Done event
894        let backend_name = hub.runtime_namespace().to_string();
895        module.register_subscription(
896            "_info",
897            "_info",
898            "_info_unsub",
899            move |_params, pending, _ctx, _ext| {
900                let name = backend_name.clone();
901                Box::pin(async move {
902                    // Create a single-item stream with the info response
903                    let info_stream = futures::stream::once(async move {
904                        serde_json::json!({"backend": name})
905                    });
906
907                    // Wrap to auto-append Done event
908                    let wrapped = super::streaming::wrap_stream(
909                        info_stream,
910                        "_info",
911                        vec![]
912                    );
913
914                    // Pipe to subscription (handles Done automatically)
915                    pipe_stream_to_subscription(pending, wrapped).await
916                })
917            }
918        )?;
919
920        // Register pending RPC methods from activations
921        let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
922        for factory in pending {
923            module.merge(factory())?;
924        }
925
926        Ok(module)
927    }
928}
929
930/// Params for {ns}.call
931#[derive(Debug, serde::Deserialize)]
932struct CallParams {
933    method: String,
934    #[serde(default)]
935    params: Option<Value>,
936}
937
938/// Params for {ns}.schema
939#[derive(Debug, Default, serde::Deserialize)]
940struct SchemaParams {
941    method: Option<String>,
942}
943
944/// Helper to pipe a PlexusStream to a subscription sink
945async fn pipe_stream_to_subscription(
946    pending: jsonrpsee::PendingSubscriptionSink,
947    mut stream: PlexusStream,
948) -> jsonrpsee::core::SubscriptionResult {
949    use futures::StreamExt;
950    use jsonrpsee::SubscriptionMessage;
951
952    let sink = pending.accept().await?;
953    while let Some(item) = stream.next().await {
954        let msg = SubscriptionMessage::new("result", sink.subscription_id(), &item)?;
955        sink.send(msg).await?;
956    }
957    Ok(())
958}
959
960// ============================================================================
961// DynamicHub RPC Methods (via plexus-macros)
962// ============================================================================
963
964#[plexus_macros::hub_methods(
965    namespace = "plexus",
966    version = "1.0.0",
967    description = "Central routing and introspection",
968    hub,
969    namespace_fn = "runtime_namespace"
970)]
971impl DynamicHub {
972    /// Route a call to a registered activation
973    #[plexus_macros::hub_method(
974        streaming,
975        description = "Route a call to a registered activation",
976        params(
977            method = "The method to call (format: namespace.method)",
978            params = "Parameters to pass to the method (optional, defaults to {})"
979        )
980    )]
981    async fn call(
982        &self,
983        method: String,
984        params: Option<Value>,
985    ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
986        use super::context::PlexusContext;
987        use super::types::{PlexusStreamItem, StreamMetadata};
988
989        let result = self.route(&method, params.unwrap_or_default()).await;
990
991        match result {
992            Ok(plexus_stream) => {
993                // Forward the routed stream directly - it already contains PlexusStreamItems
994                plexus_stream
995            }
996            Err(e) => {
997                // Return error as a PlexusStreamItem stream
998                let metadata = StreamMetadata::new(
999                    vec![self.inner.namespace.clone()],
1000                    PlexusContext::hash(),
1001                );
1002                Box::pin(futures::stream::once(async move {
1003                    PlexusStreamItem::Error {
1004                        metadata,
1005                        message: e.to_string(),
1006                        code: None,
1007                        recoverable: false,
1008                    }
1009                }))
1010            }
1011        }
1012    }
1013
1014    /// Get Plexus RPC server configuration hash (from the recursive schema)
1015    ///
1016    /// This hash changes whenever any method or child activation changes.
1017    /// It's computed from the method hashes rolled up through the schema tree.
1018    #[plexus_macros::hub_method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1019    async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1020        let schema = Activation::plugin_schema(self);
1021        stream! { yield HashEvent::Hash { value: schema.hash }; }
1022    }
1023
1024    // Note: schema() method is auto-generated by hub-macro for all activations
1025}
1026
1027// ============================================================================
1028// HubContext Implementation for Weak<DynamicHub>
1029// ============================================================================
1030
1031use super::hub_context::HubContext;
1032use std::sync::Weak;
1033
1034/// HubContext implementation for Weak<DynamicHub>
1035///
1036/// This enables activations to receive a weak reference to their parent DynamicHub,
1037/// allowing them to resolve handles and route calls through the hub without
1038/// creating reference cycles.
1039#[async_trait]
1040impl HubContext for Weak<DynamicHub> {
1041    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1042        let hub = self.upgrade().ok_or_else(|| {
1043            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1044        })?;
1045        hub.do_resolve_handle(handle).await
1046    }
1047
1048    async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1049        let hub = self.upgrade().ok_or_else(|| {
1050            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1051        })?;
1052        hub.route(method, params).await
1053    }
1054
1055    fn is_valid(&self) -> bool {
1056        self.upgrade().is_some()
1057    }
1058}
1059
1060/// ChildRouter implementation for DynamicHub
1061///
1062/// This enables nested routing through registered activations.
1063/// e.g., hub.call("solar.mercury.info") routes to solar → mercury → info
1064#[async_trait]
1065impl ChildRouter for DynamicHub {
1066    fn router_namespace(&self) -> &str {
1067        &self.inner.namespace
1068    }
1069
1070    async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
1071        // DynamicHub routes via its registered activations
1072        // Method format: "activation.method" or "activation.child.method"
1073        self.route(method, params).await
1074    }
1075
1076    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1077        // Look up registered activations that implement ChildRouter
1078        self.inner.child_routers.get(name)
1079            .map(|router| {
1080                // Clone the Arc and wrap in Box for the trait object
1081                Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1082            })
1083    }
1084}
1085
1086#[cfg(test)]
1087mod tests {
1088    use super::*;
1089
1090    #[test]
1091    fn dynamic_hub_implements_activation() {
1092        fn assert_activation<T: Activation>() {}
1093        assert_activation::<DynamicHub>();
1094    }
1095
1096    #[test]
1097    fn dynamic_hub_methods() {
1098        let hub = DynamicHub::new("test");
1099        let methods = hub.methods();
1100        assert!(methods.contains(&"call"));
1101        assert!(methods.contains(&"hash"));
1102        assert!(methods.contains(&"schema"));
1103        // list_activations was removed - use schema() instead
1104    }
1105
1106    #[test]
1107    fn dynamic_hub_hash_stable() {
1108        let h1 = DynamicHub::new("test");
1109        let h2 = DynamicHub::new("test");
1110        assert_eq!(h1.compute_hash(), h2.compute_hash());
1111    }
1112
1113    #[test]
1114    fn dynamic_hub_is_hub() {
1115        use crate::activations::health::Health;
1116        let hub = DynamicHub::new("test").register(Health::new());
1117        let schema = hub.plugin_schema();
1118
1119        // DynamicHub should be a hub (has children)
1120        assert!(schema.is_hub(), "dynamic hub should be a hub");
1121        assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1122
1123        // Should have children (as summaries)
1124        let children = schema.children.expect("dynamic hub should have children");
1125        assert!(!children.is_empty(), "dynamic hub should have at least one child");
1126
1127        // Health should be in the children summaries
1128        let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1129        assert!(!health.hash.is_empty(), "health should have a hash");
1130    }
1131
1132    #[test]
1133    fn dynamic_hub_schema_structure() {
1134        use crate::activations::health::Health;
1135        let hub = DynamicHub::new("test").register(Health::new());
1136        let schema = hub.plugin_schema();
1137
1138        // Pretty print the schema
1139        let json = serde_json::to_string_pretty(&schema).unwrap();
1140        println!("DynamicHub schema:\n{}", json);
1141
1142        // Verify structure
1143        assert_eq!(schema.namespace, "test");
1144        assert!(schema.methods.iter().any(|m| m.name == "call"));
1145        assert!(schema.children.is_some());
1146    }
1147
1148    // ========================================================================
1149    // INVARIANT: Handle routing - resolves to correct plugin
1150    // ========================================================================
1151
1152    #[tokio::test]
1153    async fn invariant_resolve_handle_unknown_activation() {
1154        use crate::activations::health::Health;
1155        use crate::types::Handle;
1156        use uuid::Uuid;
1157
1158        let hub = DynamicHub::new("test").register(Health::new());
1159
1160        // Handle for an unregistered activation (random UUID)
1161        let unknown_plugin_id = Uuid::new_v4();
1162        let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1163
1164        let result = hub.do_resolve_handle(&handle).await;
1165
1166        match result {
1167            Err(PlexusError::ActivationNotFound(_)) => {
1168                // Expected - activation not found
1169            }
1170            Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1171            Ok(_) => panic!("Expected error for unknown activation"),
1172        }
1173    }
1174
1175    #[tokio::test]
1176    async fn invariant_resolve_handle_unsupported() {
1177        use crate::activations::health::Health;
1178        use crate::types::Handle;
1179
1180        let hub = DynamicHub::new("test").register(Health::new());
1181
1182        // Handle for health activation (which doesn't support handle resolution)
1183        let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1184
1185        let result = hub.do_resolve_handle(&handle).await;
1186
1187        match result {
1188            Err(PlexusError::HandleNotSupported(name)) => {
1189                assert_eq!(name, "health");
1190            }
1191            Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1192            Ok(_) => panic!("Expected error for unsupported handle"),
1193        }
1194    }
1195
1196    #[tokio::test]
1197    async fn invariant_resolve_handle_routes_by_plugin_id() {
1198        use crate::activations::health::Health;
1199        use crate::activations::echo::Echo;
1200        use crate::types::Handle;
1201        use uuid::Uuid;
1202
1203        let health = Health::new();
1204        let echo = Echo::new();
1205        let health_plugin_id = health.plugin_id();
1206        let echo_plugin_id = echo.plugin_id();
1207
1208        let hub = DynamicHub::new("test")
1209            .register(health)
1210            .register(echo);
1211
1212        // Health handle → health activation
1213        let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1214        match hub.do_resolve_handle(&health_handle).await {
1215            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1216            Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1217            Ok(_) => panic!("health handle should return HandleNotSupported"),
1218        }
1219
1220        // Echo handle → echo activation
1221        let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1222        match hub.do_resolve_handle(&echo_handle).await {
1223            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1224            Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1225            Ok(_) => panic!("echo handle should return HandleNotSupported"),
1226        }
1227
1228        // Unknown handle → ActivationNotFound (random UUID not registered)
1229        let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1230        match hub.do_resolve_handle(&unknown_handle).await {
1231            Err(PlexusError::ActivationNotFound(_)) => { /* expected */ },
1232            Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1233            Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1234        }
1235    }
1236
1237    #[test]
1238    fn invariant_handle_plugin_id_determines_routing() {
1239        use crate::activations::health::Health;
1240        use crate::activations::echo::Echo;
1241        use crate::types::Handle;
1242
1243        let health = Health::new();
1244        let echo = Echo::new();
1245
1246        // Same meta, different activations → different routing targets (by plugin_id)
1247        let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1248            .with_meta(vec!["msg-123".into(), "user".into()]);
1249        let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1250            .with_meta(vec!["msg-123".into(), "user".into()]);
1251
1252        // Different plugin_ids ensure different routing
1253        assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1254    }
1255
1256    // ========================================================================
1257    // Plugin Registry Tests
1258    // ========================================================================
1259
1260    #[test]
1261    fn plugin_registry_basic_operations() {
1262        let mut registry = PluginRegistry::new();
1263        let id = uuid::Uuid::new_v4();
1264
1265        // Register an activation
1266        registry.register(id, "test_plugin".to_string(), "test".to_string());
1267
1268        // Lookup by ID
1269        assert_eq!(registry.lookup(id), Some("test_plugin"));
1270
1271        // Lookup by path
1272        assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1273
1274        // Get entry
1275        let entry = registry.get(id).expect("should have entry");
1276        assert_eq!(entry.path, "test_plugin");
1277        assert_eq!(entry.plugin_type, "test");
1278    }
1279
1280    #[test]
1281    fn plugin_registry_populated_on_register() {
1282        use crate::activations::health::Health;
1283
1284        let hub = DynamicHub::new("test").register(Health::new());
1285
1286        let registry = hub.registry();
1287        assert!(!registry.is_empty(), "registry should not be empty after registration");
1288
1289        // Health activation should be registered
1290        let health_id = registry.lookup_by_path("health");
1291        assert!(health_id.is_some(), "health should be registered by path");
1292
1293        // Should be able to look up path by ID
1294        let health_uuid = health_id.unwrap();
1295        assert_eq!(registry.lookup(health_uuid), Some("health"));
1296    }
1297
1298    #[test]
1299    fn plugin_registry_deterministic_uuid() {
1300        use crate::activations::health::Health;
1301
1302        // Same activation registered twice should produce same UUID
1303        let health1 = Health::new();
1304        let health2 = Health::new();
1305
1306        assert_eq!(health1.plugin_id(), health2.plugin_id(),
1307            "same activation type should have deterministic UUID");
1308
1309        // UUID should be based on namespace+major_version (semver compatibility)
1310        let expected = uuid::Uuid::new_v5(
1311            &uuid::Uuid::NAMESPACE_OID,
1312            b"health@1"
1313        );
1314        assert_eq!(health1.plugin_id(), expected,
1315            "plugin_id should be deterministic from namespace@major_version");
1316    }
1317}