Skip to main content

plexus_core/plexus/
plexus.rs

1//! DynamicHub - the central routing layer for activations
2//!
3//! DynamicHub IS an activation that also serves as the registry for other activations.
4//! It implements the Plexus RPC protocol for routing and introspection.
5//! It uses hub-macro for its methods, with the `call` method using the streaming
6//! pattern to forward responses from routed methods.
7
8use super::{
9    context::PlexusContext,
10    method_enum::MethodEnumSchema,
11    schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12    streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use futures::Stream;
18use jsonrpsee::core::server::Methods;
19use jsonrpsee::RpcModule;
20
21/// The JSON-RPC method name used in all plexus subscription notifications.
22///
23/// Every subscription registered by plexus (`.call`, `.hash`, `.schema`, `_info`)
24/// sends notifications with `"method": PLEXUS_NOTIF_METHOD` on the wire.
25/// Clients must match against this value when dispatching raw subscription frames.
26pub const PLEXUS_NOTIF_METHOD: &str = "result";
27use schemars::JsonSchema;
28use serde::{Deserialize, Serialize};
29use serde_json::Value;
30use std::collections::HashMap;
31use std::sync::Arc;
32
33// ============================================================================
34// Error Types
35// ============================================================================
36
37#[derive(Debug, Clone)]
38pub enum PlexusError {
39    ActivationNotFound(String),
40    MethodNotFound { activation: String, method: String },
41    InvalidParams(String),
42    ExecutionError(String),
43    HandleNotSupported(String),
44    TransportError(TransportErrorKind),
45    Unauthenticated(String),
46}
47
48#[derive(Debug, Clone, Serialize, Deserialize)]
49#[serde(tag = "error_kind", rename_all = "snake_case")]
50pub enum TransportErrorKind {
51    ConnectionRefused { host: String, port: u16 },
52    ConnectionTimeout { host: String, port: u16 },
53    ProtocolError { message: String },
54    NetworkError { message: String },
55}
56
57impl std::fmt::Display for TransportErrorKind {
58    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
59        match self {
60            TransportErrorKind::ConnectionRefused { host, port } => {
61                write!(f, "Connection refused to {}:{}", host, port)
62            }
63            TransportErrorKind::ConnectionTimeout { host, port } => {
64                write!(f, "Connection timeout to {}:{}", host, port)
65            }
66            TransportErrorKind::ProtocolError { message } => {
67                write!(f, "Protocol error: {}", message)
68            }
69            TransportErrorKind::NetworkError { message } => {
70                write!(f, "Network error: {}", message)
71            }
72        }
73    }
74}
75
76impl std::fmt::Display for PlexusError {
77    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
78        match self {
79            PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
80            PlexusError::MethodNotFound { activation, method } => {
81                write!(f, "Method not found: {}.{}", activation, method)
82            }
83            PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
84            PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
85            PlexusError::HandleNotSupported(activation) => {
86                write!(f, "Handle resolution not supported by activation: {}", activation)
87            }
88            PlexusError::TransportError(kind) => match kind {
89                TransportErrorKind::ConnectionRefused { host, port } => {
90                    write!(f, "Connection refused to {}:{}", host, port)
91                }
92                TransportErrorKind::ConnectionTimeout { host, port } => {
93                    write!(f, "Connection timeout to {}:{}", host, port)
94                }
95                TransportErrorKind::ProtocolError { message } => {
96                    write!(f, "Protocol error: {}", message)
97                }
98                TransportErrorKind::NetworkError { message } => {
99                    write!(f, "Network error: {}", message)
100                }
101            }
102            PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
103        }
104    }
105}
106
107impl std::error::Error for PlexusError {}
108
109/// Convert PlexusError to a JSON-RPC ErrorObject with semantic error codes.
110///
111/// Codes:
112/// - `-32001`: Authentication required (custom app-level error)
113/// - `-32601`: Method/activation not found (standard JSON-RPC)
114/// - `-32602`: Invalid parameters (standard JSON-RPC)
115/// - `-32000`: Generic server error (execution, transport, handle errors)
116/// Get the semantic JSON-RPC error code for a PlexusError.
117fn plexus_error_code(e: &PlexusError) -> i32 {
118    match e {
119        PlexusError::Unauthenticated(_) => -32001,
120        PlexusError::InvalidParams(_) => -32602,
121        PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
122        _ => -32000,
123    }
124}
125
126/// Convert PlexusError to a JSON-RPC ErrorObject with semantic error codes.
127fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
128    jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
129}
130
131// ============================================================================
132// Schema Types
133// ============================================================================
134
135#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
136pub struct ActivationInfo {
137    pub namespace: String,
138    pub version: String,
139    pub description: String,
140    pub methods: Vec<String>,
141}
142
143// ============================================================================
144// Activation Trait
145// ============================================================================
146
147#[async_trait]
148pub trait Activation: Send + Sync + 'static {
149    type Methods: MethodEnumSchema;
150
151    fn namespace(&self) -> &str;
152    fn version(&self) -> &str;
153    /// Short description (max 15 words)
154    fn description(&self) -> &str { "No description available" }
155    /// Long description (optional, for detailed documentation)
156    fn long_description(&self) -> Option<&str> { None }
157    fn methods(&self) -> Vec<&str>;
158    fn method_help(&self, _method: &str) -> Option<String> { None }
159    /// Stable activation instance ID for handle routing
160    /// By default generates a deterministic UUID from namespace+major_version
161    /// Using major version only ensures handles survive minor/patch upgrades (semver)
162    fn plugin_id(&self) -> uuid::Uuid {
163        let major_version = self.version().split('.').next().unwrap_or("0");
164        uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
165    }
166
167    async fn call(
168        &self,
169        method: &str,
170        params: Value,
171        auth: Option<&super::auth::AuthContext>,
172        raw_ctx: Option<&crate::request::RawRequestContext>,
173    ) -> Result<PlexusStream, PlexusError>;
174    async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
175        Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
176    }
177
178    fn into_rpc_methods(self) -> Methods where Self: Sized;
179
180    /// Return this activation's schema (methods + optional children)
181    fn plugin_schema(&self) -> PluginSchema {
182        use std::collections::hash_map::DefaultHasher;
183        use std::hash::{Hash, Hasher};
184
185        let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
186            let desc = self.method_help(name).unwrap_or_default();
187            // Compute a simple hash for methods not using hub-macro
188            let mut hasher = DefaultHasher::new();
189            name.hash(&mut hasher);
190            desc.hash(&mut hasher);
191            let hash = format!("{:016x}", hasher.finish());
192            MethodSchema::new(name.to_string(), desc, hash)
193        }).collect();
194
195        if let Some(long_desc) = self.long_description() {
196            PluginSchema::leaf_with_long_description(
197                self.namespace(),
198                self.version(),
199                self.description(),
200                long_desc,
201                methods,
202            )
203        } else {
204            PluginSchema::leaf(
205                self.namespace(),
206                self.version(),
207                self.description(),
208                methods,
209            )
210        }
211    }
212}
213
214// ============================================================================
215// Child Routing for Hub Plugins
216// ============================================================================
217
218/// Trait for activations that can route to child activations
219///
220/// Hub activations implement this to support nested method routing.
221/// When a method like "mercury.info" is called on a solar activation,
222/// this trait enables routing to the mercury child.
223///
224/// This trait is separate from Activation to avoid associated type issues
225/// with dynamic dispatch.
226#[async_trait]
227pub trait ChildRouter: Send + Sync {
228    /// Get the namespace of this router (for error messages)
229    fn router_namespace(&self) -> &str;
230
231    /// Call a method on this router
232    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
233
234    /// Get a child activation instance by name for nested routing
235    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
236}
237
238/// Route a method call to a child activation
239///
240/// This is called from generated code when a hub activation receives
241/// a method that doesn't match its local methods. If the method
242/// contains a dot (e.g., "mercury.info"), it routes to the child.
243pub async fn route_to_child<T: ChildRouter + ?Sized>(
244    parent: &T,
245    method: &str,
246    params: Value,
247    auth: Option<&super::auth::AuthContext>,
248    raw_ctx: Option<&crate::request::RawRequestContext>,
249) -> Result<PlexusStream, PlexusError> {
250    // Try to split on first dot for nested routing
251    if let Some((child_name, rest)) = method.split_once('.') {
252        if let Some(child) = parent.get_child(child_name).await {
253            return child.router_call(rest, params, auth, raw_ctx).await;
254        }
255        return Err(PlexusError::ActivationNotFound(child_name.to_string()));
256    }
257
258    // No dot - method simply not found
259    Err(PlexusError::MethodNotFound {
260        activation: parent.router_namespace().to_string(),
261        method: method.to_string(),
262    })
263}
264
265/// Wrapper to implement ChildRouter for Arc<dyn ChildRouter>
266///
267/// This allows DynamicHub to return its stored Arc<dyn ChildRouter> from get_child()
268struct ArcChildRouter(Arc<dyn ChildRouter>);
269
270#[async_trait]
271impl ChildRouter for ArcChildRouter {
272    fn router_namespace(&self) -> &str {
273        self.0.router_namespace()
274    }
275
276    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
277        self.0.router_call(method, params, auth, raw_ctx).await
278    }
279
280    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
281        self.0.get_child(name).await
282    }
283}
284
285// ============================================================================
286// Internal Type-Erased Activation
287// ============================================================================
288
289#[async_trait]
290#[allow(dead_code)] // Methods exist for completeness but some aren't called post-erasure yet
291trait ActivationObject: Send + Sync + 'static {
292    fn namespace(&self) -> &str;
293    fn version(&self) -> &str;
294    fn description(&self) -> &str;
295    fn long_description(&self) -> Option<&str>;
296    fn methods(&self) -> Vec<&str>;
297    fn method_help(&self, method: &str) -> Option<String>;
298    fn plugin_id(&self) -> uuid::Uuid;
299    async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
300    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
301    fn plugin_schema(&self) -> PluginSchema;
302    fn schema(&self) -> Schema;
303}
304
305struct ActivationWrapper<A: Activation> {
306    inner: A,
307}
308
309#[async_trait]
310impl<A: Activation> ActivationObject for ActivationWrapper<A> {
311    fn namespace(&self) -> &str { self.inner.namespace() }
312    fn version(&self) -> &str { self.inner.version() }
313    fn description(&self) -> &str { self.inner.description() }
314    fn long_description(&self) -> Option<&str> { self.inner.long_description() }
315    fn methods(&self) -> Vec<&str> { self.inner.methods() }
316    fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
317    fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
318
319    async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
320        self.inner.call(method, params, auth, raw_ctx).await
321    }
322
323    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
324        self.inner.resolve_handle(handle).await
325    }
326
327    fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
328
329    fn schema(&self) -> Schema {
330        let schema = schemars::schema_for!(A::Methods);
331        serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
332            .expect("parse schema")
333    }
334}
335
336// ============================================================================
337// Plexus Event Types
338// ============================================================================
339
340#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
341#[serde(tag = "event", rename_all = "snake_case")]
342pub enum HashEvent {
343    Hash { value: String },
344}
345
346/// Event for schema() RPC method - returns plugin schema
347#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
348#[serde(tag = "event", rename_all = "snake_case")]
349pub enum SchemaEvent {
350    /// This plugin's schema
351    Schema(PluginSchema),
352}
353
354/// Lightweight hash information for cache validation
355#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
356pub struct PluginHashes {
357    pub namespace: String,
358    pub self_hash: String,
359    #[serde(skip_serializing_if = "Option::is_none")]
360    pub children_hash: Option<String>,
361    pub hash: String,
362    /// Child plugin hashes (for recursive checking)
363    #[serde(skip_serializing_if = "Option::is_none")]
364    pub children: Option<Vec<ChildHashes>>,
365}
366
367/// Hash information for a child plugin
368#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
369pub struct ChildHashes {
370    pub namespace: String,
371    pub hash: String,
372}
373
374
375// ============================================================================
376// Activation Registry
377// ============================================================================
378
379/// Entry in the activation registry
380#[derive(Debug, Clone)]
381pub struct PluginEntry {
382    /// Stable activation instance ID
383    pub id: uuid::Uuid,
384    /// Current path/namespace for this activation
385    pub path: String,
386    /// Activation type (e.g., "cone", "bash", "arbor")
387    pub plugin_type: String,
388}
389
390/// Registry mapping activation UUIDs to their current paths
391///
392/// This enables handle routing without path dependency - handles reference
393/// activations by their stable UUID, and the registry maps to the current path.
394#[derive(Default)]
395pub struct PluginRegistry {
396    /// Lookup by plugin UUID
397    by_id: HashMap<uuid::Uuid, PluginEntry>,
398    /// Lookup by current path (for reverse lookup)
399    by_path: HashMap<String, uuid::Uuid>,
400}
401
402/// Read-only snapshot of the activation registry
403///
404/// Safe to use outside of DynamicHub locks.
405#[derive(Clone)]
406pub struct PluginRegistrySnapshot {
407    by_id: HashMap<uuid::Uuid, PluginEntry>,
408    by_path: HashMap<String, uuid::Uuid>,
409}
410
411impl PluginRegistrySnapshot {
412    /// Look up an activation's path by its UUID
413    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
414        self.by_id.get(&id).map(|e| e.path.as_str())
415    }
416
417    /// Look up an activation's UUID by its path
418    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
419        self.by_path.get(path).copied()
420    }
421
422    /// Get an activation entry by its UUID
423    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
424        self.by_id.get(&id)
425    }
426
427    /// List all registered activations
428    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
429        self.by_id.values()
430    }
431
432    /// Get the number of registered plugins
433    pub fn len(&self) -> usize {
434        self.by_id.len()
435    }
436
437    /// Check if the registry is empty
438    pub fn is_empty(&self) -> bool {
439        self.by_id.is_empty()
440    }
441}
442
443impl PluginRegistry {
444    /// Create a new empty registry
445    pub fn new() -> Self {
446        Self::default()
447    }
448
449    /// Look up an activation's path by its UUID
450    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
451        self.by_id.get(&id).map(|e| e.path.as_str())
452    }
453
454    /// Look up an activation's UUID by its path
455    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
456        self.by_path.get(path).copied()
457    }
458
459    /// Get an activation entry by its UUID
460    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
461        self.by_id.get(&id)
462    }
463
464    /// Register an activation
465    pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
466        let entry = PluginEntry { id, path: path.clone(), plugin_type };
467        self.by_id.insert(id, entry);
468        self.by_path.insert(path, id);
469    }
470
471    /// List all registered activations
472    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
473        self.by_id.values()
474    }
475
476    /// Get the number of registered plugins
477    pub fn len(&self) -> usize {
478        self.by_id.len()
479    }
480
481    /// Check if the registry is empty
482    pub fn is_empty(&self) -> bool {
483        self.by_id.is_empty()
484    }
485}
486
487// ============================================================================
488// DynamicHub (formerly Plexus)
489// ============================================================================
490
491struct DynamicHubInner {
492    /// Custom namespace for this hub instance (defaults to "plexus")
493    namespace: String,
494    activations: HashMap<String, Arc<dyn ActivationObject>>,
495    /// Child routers for direct nested routing (e.g., hub.solar.mercury.info)
496    child_routers: HashMap<String, Arc<dyn ChildRouter>>,
497    /// Activation registry mapping UUIDs to paths
498    registry: std::sync::RwLock<PluginRegistry>,
499    pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
500}
501
502/// DynamicHub - an activation that routes to dynamically registered child activations
503///
504/// Unlike hub activations with hardcoded children (like Solar),
505/// DynamicHub allows registering activations at runtime via `.register()`.
506///
507/// # Direct Hosting
508///
509/// For a single activation, host it directly:
510/// ```ignore
511/// let solar = Arc::new(Solar::new());
512/// TransportServer::builder(solar, converter).serve().await?;
513/// ```
514///
515/// # Composition
516///
517/// For multiple top-level activations, use DynamicHub:
518/// ```ignore
519/// let hub = DynamicHub::with_namespace("myapp")
520///     .register(Solar::new())
521///     .register(Echo::new());
522/// ```
523#[derive(Clone)]
524pub struct DynamicHub {
525    inner: Arc<DynamicHubInner>,
526}
527
528// ============================================================================
529// DynamicHub Infrastructure (non-RPC methods)
530// ============================================================================
531
532impl DynamicHub {
533    /// Create a new DynamicHub with explicit namespace
534    ///
535    /// Unlike single activations which have fixed namespaces, DynamicHub is a
536    /// composition tool that can be named based on your application. Common choices:
537    /// - "hub" - generic default
538    /// - "substrate" - for substrate server
539    /// - "myapp" - for your application name
540    ///
541    /// The namespace appears in method calls: `{namespace}.call`, `{namespace}.schema`
542    pub fn new(namespace: impl Into<String>) -> Self {
543        Self {
544            inner: Arc::new(DynamicHubInner {
545                namespace: namespace.into(),
546                activations: HashMap::new(),
547                child_routers: HashMap::new(),
548                registry: std::sync::RwLock::new(PluginRegistry::new()),
549                pending_rpc: std::sync::Mutex::new(Vec::new()),
550            }),
551        }
552    }
553
554    /// Deprecated: Use new() with explicit namespace instead
555    #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
556    pub fn with_namespace(namespace: impl Into<String>) -> Self {
557        Self::new(namespace)
558    }
559
560    /// Get the runtime namespace for this DynamicHub instance
561    pub fn runtime_namespace(&self) -> &str {
562        &self.inner.namespace
563    }
564
565    /// Get access to the activation registry
566    pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
567        self.inner.registry.read().unwrap()
568    }
569
570    /// Register an activation
571    pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
572        let namespace = activation.namespace().to_string();
573        let plugin_id = activation.plugin_id();
574        let activation_for_rpc = activation.clone();
575        let activation_for_router = activation.clone();
576
577        let inner = Arc::get_mut(&mut self.inner)
578            .expect("Cannot register: DynamicHub has multiple references");
579
580        // Register in the activation registry
581        inner.registry.write().unwrap().register(
582            plugin_id,
583            namespace.clone(),
584            namespace.clone(), // Use namespace as plugin_type for now
585        );
586
587        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
588        inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
589        inner.pending_rpc.lock().unwrap()
590            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
591        self
592    }
593
594    /// Register a hub activation that supports nested routing
595    ///
596    /// Hub activations implement `ChildRouter`, enabling direct nested method calls
597    /// like `hub.solar.mercury.info` at the RPC layer (no hub.call indirection).
598    #[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
599    pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
600        let namespace = activation.namespace().to_string();
601        let plugin_id = activation.plugin_id();
602        let activation_for_rpc = activation.clone();
603        let activation_for_router = activation.clone();
604
605        let inner = Arc::get_mut(&mut self.inner)
606            .expect("Cannot register: DynamicHub has multiple references");
607
608        // Register in the activation registry
609        inner.registry.write().unwrap().register(
610            plugin_id,
611            namespace.clone(),
612            namespace.clone(), // Use namespace as plugin_type for now
613        );
614
615        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
616        inner.child_routers.insert(namespace, Arc::new(activation_for_router));
617        inner.pending_rpc.lock().unwrap()
618            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
619        self
620    }
621
622    /// List all methods across all activations
623    pub fn list_methods(&self) -> Vec<String> {
624        let mut methods = Vec::new();
625
626        // Include hub's own methods
627        for m in Activation::methods(self) {
628            methods.push(format!("{}.{}", self.inner.namespace, m));
629        }
630
631        // Include registered activation methods
632        for (ns, act) in &self.inner.activations {
633            for m in act.methods() {
634                methods.push(format!("{}.{}", ns, m));
635            }
636        }
637        methods.sort();
638        methods
639    }
640
641    /// List all activations (including this hub itself)
642    pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
643        let mut activations = Vec::new();
644
645        // Include this hub itself
646        activations.push(ActivationInfo {
647            namespace: Activation::namespace(self).to_string(),
648            version: Activation::version(self).to_string(),
649            description: Activation::description(self).to_string(),
650            methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
651        });
652
653        // Include registered activations
654        for a in self.inner.activations.values() {
655            activations.push(ActivationInfo {
656                namespace: a.namespace().to_string(),
657                version: a.version().to_string(),
658                description: a.description().to_string(),
659                methods: a.methods().iter().map(|s| s.to_string()).collect(),
660            });
661        }
662
663        activations
664    }
665
666    /// Compute hash for cache invalidation
667    ///
668    /// Returns the hash from the recursive plugin schema. This hash changes
669    /// whenever any method definition or child plugin changes.
670    pub fn compute_hash(&self) -> String {
671        Activation::plugin_schema(self).hash
672    }
673
674    /// Route a call to the appropriate activation
675    pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
676        self.route_with_ctx(method, params, auth, None).await
677    }
678
679    /// Route a call to the appropriate activation, with optional raw HTTP request context.
680    pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
681        let (namespace, method_name) = self.parse_method(method)?;
682
683        // Handle plexus's own methods
684        if namespace == self.inner.namespace {
685            return Activation::call(self, method_name, params, auth, raw_ctx).await;
686        }
687
688        let activation = self.inner.activations.get(namespace)
689            .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
690
691        activation.call(method_name, params, auth, raw_ctx).await
692    }
693
694    /// Resolve a handle using the activation registry
695    ///
696    /// Looks up the activation by its UUID in the registry.
697    pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
698        let path = self.inner.registry.read().unwrap()
699            .lookup(handle.plugin_id)
700            .map(|s| s.to_string())
701            .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
702
703        let activation = self.inner.activations.get(&path)
704            .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
705        activation.resolve_handle(handle).await
706    }
707
708    /// Get activation schema
709    pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
710        self.inner.activations.get(namespace).map(|a| a.schema())
711    }
712
713    /// Get a snapshot of the activation registry (safe to use outside locks)
714    pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
715        let guard = self.inner.registry.read().unwrap();
716        PluginRegistrySnapshot {
717            by_id: guard.by_id.clone(),
718            by_path: guard.by_path.clone(),
719        }
720    }
721
722    /// Look up an activation path by its UUID
723    pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
724        self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
725    }
726
727    /// Look up an activation UUID by its path
728    pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
729        self.inner.registry.read().unwrap().lookup_by_path(path)
730    }
731
732    /// Get activation schemas for all activations (including this hub itself)
733    pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
734        let mut schemas = Vec::new();
735
736        // Include this hub itself
737        schemas.push(Activation::plugin_schema(self));
738
739        // Include registered activations
740        for a in self.inner.activations.values() {
741            schemas.push(a.plugin_schema());
742        }
743
744        schemas
745    }
746
747    /// Deprecated: use list_plugin_schemas instead
748    #[deprecated(note = "Use list_plugin_schemas instead")]
749    pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
750        self.list_plugin_schemas()
751    }
752
753    /// Get help for a method
754    pub fn get_method_help(&self, method: &str) -> Option<String> {
755        let (namespace, method_name) = self.parse_method(method).ok()?;
756        let activation = self.inner.activations.get(namespace)?;
757        activation.method_help(method_name)
758    }
759
760    fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
761        let parts: Vec<&str> = method.splitn(2, '.').collect();
762        if parts.len() != 2 {
763            return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
764        }
765        Ok((parts[0], parts[1]))
766    }
767
768    /// Get child activation summaries (for hub functionality)
769    /// Called by hub-macro when `hub` flag is set
770    pub fn plugin_children(&self) -> Vec<ChildSummary> {
771        self.inner.activations.values()
772            .map(|a| {
773                let schema = a.plugin_schema();
774                ChildSummary {
775                    namespace: schema.namespace,
776                    description: schema.description,
777                    hash: schema.hash,
778                }
779            })
780            .collect()
781    }
782
783    /// Convert to RPC module
784    pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
785        let mut module = RpcModule::new(());
786
787        PlexusContext::init(self.compute_hash());
788
789        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
790        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
791        let ns = self.runtime_namespace();
792        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
793        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
794        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
795        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
796        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
797        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
798        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
799        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
800        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
801
802        // Register {ns}.call subscription
803        let plexus_for_call = self.clone();
804        module.register_subscription(
805            call_method,
806            PLEXUS_NOTIF_METHOD,
807            call_unsub,
808            move |params, pending, _ctx, _ext| {
809                let plexus = plexus_for_call.clone();
810                Box::pin(async move {
811                    let p: CallParams = params.parse()?;
812                    match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
813                        Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
814                        Err(e) => {
815                            let sink = pending.accept().await?;
816                            let error_item = super::types::PlexusStreamItem::Error {
817                                metadata: super::types::StreamMetadata::new(
818                                    vec![ns_static.into()],
819                                    PlexusContext::hash(),
820                                ),
821                                message: e.to_string(),
822                                code: Some(plexus_error_code(&e).to_string()),
823                                recoverable: false,
824                            };
825                            if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
826                                let _ = sink.send(raw).await;
827                            }
828                            Ok(())
829                        }
830                    }
831                })
832            }
833        )?;
834
835        // Register {ns}.hash subscription
836        let plexus_for_hash = self.clone();
837        module.register_subscription(
838            hash_method,
839            PLEXUS_NOTIF_METHOD,
840            hash_unsub,
841            move |_params, pending, _ctx, _ext| {
842                let plexus = plexus_for_hash.clone();
843                Box::pin(async move {
844                    let schema = Activation::plugin_schema(&plexus);
845                    let stream = async_stream::stream! {
846                        yield HashEvent::Hash { value: schema.hash };
847                    };
848                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
849                    pipe_stream_to_subscription(pending, wrapped).await
850                })
851            }
852        )?;
853
854        // Register {ns}.schema subscription
855        let plexus_for_schema = self.clone();
856        module.register_subscription(
857            schema_method,
858            PLEXUS_NOTIF_METHOD,
859            schema_unsub,
860            move |params, pending, _ctx, _ext| {
861                let plexus = plexus_for_schema.clone();
862                Box::pin(async move {
863                    let p: SchemaParams = params.parse().unwrap_or_default();
864                    let plugin_schema = Activation::plugin_schema(&plexus);
865
866                    let result = if let Some(ref name) = p.method {
867                        plugin_schema.methods.iter()
868                            .find(|m| m.name == *name)
869                            .map(|m| super::SchemaResult::Method(m.clone()))
870                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
871                                -32602,
872                                format!("Method '{}' not found", name),
873                                None::<()>,
874                            ))?
875                    } else {
876                        super::SchemaResult::Plugin(plugin_schema)
877                    };
878
879                    let stream = async_stream::stream! { yield result; };
880                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
881                    pipe_stream_to_subscription(pending, wrapped).await
882                })
883            }
884        )?;
885
886        // Register _info well-known endpoint (no namespace prefix)
887        // Returns backend name as a single-item stream with automatic Done event
888        let backend_name = self.runtime_namespace().to_string();
889        module.register_subscription(
890            "_info",
891            PLEXUS_NOTIF_METHOD,
892            "_info_unsub",
893            move |_params, pending, _ctx, _ext| {
894                let name = backend_name.clone();
895                Box::pin(async move {
896                    // Create a single-item stream with the info response
897                    let info_stream = futures::stream::once(async move {
898                        serde_json::json!({"backend": name})
899                    });
900
901                    // Wrap to auto-append Done event
902                    let wrapped = super::streaming::wrap_stream(
903                        info_stream,
904                        "_info",
905                        vec![]
906                    );
907
908                    // Pipe to subscription (handles Done automatically)
909                    pipe_stream_to_subscription(pending, wrapped).await
910                })
911            }
912        )?;
913
914        // Add all registered activation RPC methods
915        let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
916        for factory in pending {
917            module.merge(factory())?;
918        }
919
920        Ok(module)
921    }
922
923    /// Convert Arc<DynamicHub> to RPC module while keeping the Arc alive
924    ///
925    /// Unlike `into_rpc_module`, this keeps the Arc<DynamicHub> reference alive,
926    /// which is necessary when activations hold Weak<DynamicHub> references that
927    /// need to remain upgradeable.
928    pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
929        let mut module = RpcModule::new(());
930
931        PlexusContext::init(hub.compute_hash());
932
933        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
934        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
935        let ns = hub.runtime_namespace();
936        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
937        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
938        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
939        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
940        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
941        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
942        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
943        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
944        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
945
946        // Register {ns}.call subscription - clone Arc to keep reference alive
947        let hub_for_call = hub.clone();
948        module.register_subscription(
949            call_method,
950            call_method,
951            call_unsub,
952            move |params, pending, _ctx, ext| {
953                let hub = hub_for_call.clone();
954                Box::pin(async move {
955                    let p: CallParams = params.parse()?;
956                    // Extract auth context from Extensions (if present)
957                    let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
958                        .map(|arc| arc.as_ref());
959                    match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
960                        Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
961                        Err(e) => {
962                            // Accept the subscription, then send the error as a stream item.
963                            // This preserves the error message and code — returning Err(...)
964                            // from a subscription handler causes jsonrpsee to wrap it as
965                            // generic -32603, discarding our semantic error code.
966                            let sink = pending.accept().await?;
967                            let error_item = super::types::PlexusStreamItem::Error {
968                                metadata: super::types::StreamMetadata::new(
969                                    vec![ns_static.into()],
970                                    PlexusContext::hash(),
971                                ),
972                                message: e.to_string(),
973                                code: Some(plexus_error_code(&e).to_string()),
974                                recoverable: false,
975                            };
976                            if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
977                                let _ = sink.send(raw).await;
978                            }
979                            Ok(())
980                        }
981                    }
982                })
983            }
984        )?;
985
986        // Register {ns}.hash subscription
987        let hub_for_hash = hub.clone();
988        module.register_subscription(
989            hash_method,
990            PLEXUS_NOTIF_METHOD,
991            hash_unsub,
992            move |_params, pending, _ctx, _ext| {
993                let hub = hub_for_hash.clone();
994                Box::pin(async move {
995                    let schema = Activation::plugin_schema(&*hub);
996                    let stream = async_stream::stream! {
997                        yield HashEvent::Hash { value: schema.hash };
998                    };
999                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1000                    pipe_stream_to_subscription(pending, wrapped).await
1001                })
1002            }
1003        )?;
1004
1005        // Register {ns}.schema subscription
1006        let hub_for_schema = hub.clone();
1007        module.register_subscription(
1008            schema_method,
1009            PLEXUS_NOTIF_METHOD,
1010            schema_unsub,
1011            move |params, pending, _ctx, _ext| {
1012                let hub = hub_for_schema.clone();
1013                Box::pin(async move {
1014                    let p: SchemaParams = params.parse().unwrap_or_default();
1015                    let plugin_schema = Activation::plugin_schema(&*hub);
1016
1017                    let result = if let Some(ref name) = p.method {
1018                        plugin_schema.methods.iter()
1019                            .find(|m| m.name == *name)
1020                            .map(|m| super::SchemaResult::Method(m.clone()))
1021                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1022                                -32602,
1023                                format!("Method '{}' not found", name),
1024                                None::<()>,
1025                            ))?
1026                    } else {
1027                        super::SchemaResult::Plugin(plugin_schema)
1028                    };
1029
1030                    let stream = async_stream::stream! {
1031                        yield result;
1032                    };
1033                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1034                    pipe_stream_to_subscription(pending, wrapped).await
1035                })
1036            }
1037        )?;
1038
1039        // Register _info well-known endpoint (no namespace prefix)
1040        // Returns backend name as a single-item stream with automatic Done event
1041        let backend_name = hub.runtime_namespace().to_string();
1042        module.register_subscription(
1043            "_info",
1044            PLEXUS_NOTIF_METHOD,
1045            "_info_unsub",
1046            move |_params, pending, _ctx, _ext| {
1047                let name = backend_name.clone();
1048                Box::pin(async move {
1049                    // Create a single-item stream with the info response
1050                    let info_stream = futures::stream::once(async move {
1051                        serde_json::json!({"backend": name})
1052                    });
1053
1054                    // Wrap to auto-append Done event
1055                    let wrapped = super::streaming::wrap_stream(
1056                        info_stream,
1057                        "_info",
1058                        vec![]
1059                    );
1060
1061                    // Pipe to subscription (handles Done automatically)
1062                    pipe_stream_to_subscription(pending, wrapped).await
1063                })
1064            }
1065        )?;
1066
1067        // Register {ns}.respond method for WebSocket bidirectional responses
1068        // This allows clients to respond to server-initiated requests (like confirmations/prompts)
1069        let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
1070        module.register_async_method(respond_method, |params, _ctx, _ext| async move {
1071            use super::bidirectional::{handle_pending_response, BidirError};
1072
1073            let p: RespondParams = params.parse()?;
1074
1075            tracing::debug!(
1076                request_id = %p.request_id,
1077                "Handling {}.respond via WebSocket",
1078                "plexus"
1079            );
1080
1081            match handle_pending_response(&p.request_id, p.response_data) {
1082                Ok(()) => Ok(serde_json::json!({"success": true})),
1083                Err(BidirError::UnknownRequest) => {
1084                    tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1085                    Err(jsonrpsee::types::ErrorObject::owned(
1086                        -32602,
1087                        format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1088                        None::<()>,
1089                    ))
1090                }
1091                Err(BidirError::ChannelClosed) => {
1092                    tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1093                    Err(jsonrpsee::types::ErrorObject::owned(
1094                        -32000,
1095                        "Response channel was closed (request may have timed out)",
1096                        None::<()>,
1097                    ))
1098                }
1099                Err(e) => {
1100                    tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1101                    Err(jsonrpsee::types::ErrorObject::owned(
1102                        -32000,
1103                        format!("Failed to deliver response: {}", e),
1104                        None::<()>,
1105                    ))
1106                }
1107            }
1108        })?;
1109
1110        // Register pending RPC methods from activations
1111        let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1112        eprintln!("[TRACE] arc_into_rpc_module: merging {} activation RPC factories", pending.len());
1113        for (idx, factory) in pending.into_iter().enumerate() {
1114            eprintln!("[TRACE] arc_into_rpc_module: calling factory {} to get Methods", idx);
1115            let methods = factory();
1116            eprintln!("[TRACE] arc_into_rpc_module: factory {} returned Methods with {} methods", idx, methods.method_names().count());
1117            eprintln!("[TRACE] arc_into_rpc_module: merging factory {} methods into module", idx);
1118            module.merge(methods)?;
1119            eprintln!("[TRACE] arc_into_rpc_module: successfully merged factory {} methods", idx);
1120        }
1121        eprintln!("[TRACE] arc_into_rpc_module: all activations merged successfully");
1122
1123        Ok(module)
1124    }
1125}
1126
1127/// Params for {ns}.call
1128#[derive(Debug, serde::Deserialize)]
1129struct CallParams {
1130    method: String,
1131    #[serde(default)]
1132    params: Option<Value>,
1133}
1134
1135/// Params for {ns}.schema
1136#[derive(Debug, Default, serde::Deserialize)]
1137struct SchemaParams {
1138    method: Option<String>,
1139}
1140
1141/// Params for {ns}.respond (WebSocket bidirectional response)
1142#[derive(Debug, serde::Deserialize)]
1143struct RespondParams {
1144    request_id: String,
1145    response_data: Value,
1146}
1147
1148/// Helper to pipe a PlexusStream to a subscription sink.
1149///
1150/// Notifications are sent with `method: PLEXUS_NOTIF_METHOD` on the wire,
1151/// as set by the `notif_method_name` arg in each `register_subscription` call.
1152async fn pipe_stream_to_subscription(
1153    pending: jsonrpsee::PendingSubscriptionSink,
1154    mut stream: PlexusStream,
1155) -> jsonrpsee::core::SubscriptionResult {
1156    use futures::StreamExt;
1157
1158    let sink = pending.accept().await?;
1159    while let Some(item) = stream.next().await {
1160        let json = serde_json::value::to_raw_value(&item)?;
1161        sink.send(json).await?;
1162    }
1163    Ok(())
1164}
1165
1166// ============================================================================
1167// DynamicHub RPC Methods (via plexus-macros)
1168// ============================================================================
1169
1170#[plexus_macros::activation(
1171    namespace = "plexus",
1172    version = "1.0.0",
1173    description = "Central routing and introspection",
1174    hub,
1175    namespace_fn = "runtime_namespace"
1176)]
1177impl DynamicHub {
1178    /// Route a call to a registered activation
1179    #[plexus_macros::method(
1180        streaming,
1181        description = "Route a call to a registered activation",
1182        params(
1183            method = "The method to call (format: namespace.method)",
1184            params = "Parameters to pass to the method (optional, defaults to {})"
1185        )
1186    )]
1187    async fn call(
1188        &self,
1189        method: String,
1190        params: Option<Value>,
1191    ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1192        use super::context::PlexusContext;
1193        use super::types::{PlexusStreamItem, StreamMetadata};
1194
1195        let result = self.route(&method, params.unwrap_or_default(), None).await;
1196
1197        match result {
1198            Ok(plexus_stream) => {
1199                // Forward the routed stream directly - it already contains PlexusStreamItems
1200                plexus_stream
1201            }
1202            Err(e) => {
1203                // Return error as a PlexusStreamItem stream
1204                let metadata = StreamMetadata::new(
1205                    vec![self.inner.namespace.clone()],
1206                    PlexusContext::hash(),
1207                );
1208                Box::pin(futures::stream::once(async move {
1209                    PlexusStreamItem::Error {
1210                        metadata,
1211                        message: e.to_string(),
1212                        code: None,
1213                        recoverable: false,
1214                    }
1215                }))
1216            }
1217        }
1218    }
1219
1220    /// Get Plexus RPC server configuration hash (from the recursive schema)
1221    ///
1222    /// This hash changes whenever any method or child activation changes.
1223    /// It's computed from the method hashes rolled up through the schema tree.
1224    #[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1225    async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1226        let schema = Activation::plugin_schema(self);
1227        stream! { yield HashEvent::Hash { value: schema.hash }; }
1228    }
1229
1230    /// Get plugin hashes for cache validation (lightweight alternative to full schema)
1231    #[plexus_macros::method(description = "Get plugin hashes for cache validation")]
1232    async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1233        let schema = Activation::plugin_schema(self);
1234
1235        stream! {
1236            yield PluginHashes {
1237                namespace: schema.namespace.clone(),
1238                self_hash: schema.self_hash.clone(),
1239                children_hash: schema.children_hash.clone(),
1240                hash: schema.hash.clone(),
1241                children: schema.children.as_ref().map(|kids| {
1242                    kids.iter()
1243                        .map(|c| ChildHashes {
1244                            namespace: c.namespace.clone(),
1245                            hash: c.hash.clone(),
1246                        })
1247                        .collect()
1248                }),
1249            };
1250        }
1251    }
1252
1253    // Note: schema() method is auto-generated by hub-macro for all activations
1254}
1255
1256// ============================================================================
1257// HubContext Implementation for Weak<DynamicHub>
1258// ============================================================================
1259
1260use super::hub_context::HubContext;
1261use std::sync::Weak;
1262
1263/// HubContext implementation for Weak<DynamicHub>
1264///
1265/// This enables activations to receive a weak reference to their parent DynamicHub,
1266/// allowing them to resolve handles and route calls through the hub without
1267/// creating reference cycles.
1268#[async_trait]
1269impl HubContext for Weak<DynamicHub> {
1270    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1271        let hub = self.upgrade().ok_or_else(|| {
1272            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1273        })?;
1274        hub.do_resolve_handle(handle).await
1275    }
1276
1277    async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1278        let hub = self.upgrade().ok_or_else(|| {
1279            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1280        })?;
1281        hub.route(method, params, None).await
1282    }
1283
1284    fn is_valid(&self) -> bool {
1285        self.upgrade().is_some()
1286    }
1287}
1288
1289/// ChildRouter implementation for DynamicHub
1290///
1291/// This enables nested routing through registered activations.
1292/// e.g., hub.call("solar.mercury.info") routes to solar → mercury → info
1293#[async_trait]
1294impl ChildRouter for DynamicHub {
1295    fn router_namespace(&self) -> &str {
1296        &self.inner.namespace
1297    }
1298
1299    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1300        // DynamicHub routes via its registered activations
1301        // Method format: "activation.method" or "activation.child.method"
1302        self.route_with_ctx(method, params, auth, raw_ctx).await
1303    }
1304
1305    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1306        // Look up registered activations that implement ChildRouter
1307        self.inner.child_routers.get(name)
1308            .map(|router| {
1309                // Clone the Arc and wrap in Box for the trait object
1310                Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1311            })
1312    }
1313}
1314
1315#[cfg(test)]
1316mod tests {
1317    use super::*;
1318
1319    #[test]
1320    fn dynamic_hub_implements_activation() {
1321        fn assert_activation<T: Activation>() {}
1322        assert_activation::<DynamicHub>();
1323    }
1324
1325    #[test]
1326    fn dynamic_hub_methods() {
1327        let hub = DynamicHub::new("test");
1328        let methods = hub.methods();
1329        assert!(methods.contains(&"call"));
1330        assert!(methods.contains(&"hash"));
1331        assert!(methods.contains(&"schema"));
1332        // list_activations was removed - use schema() instead
1333    }
1334
1335    #[test]
1336    fn dynamic_hub_hash_stable() {
1337        let h1 = DynamicHub::new("test");
1338        let h2 = DynamicHub::new("test");
1339        assert_eq!(h1.compute_hash(), h2.compute_hash());
1340    }
1341
1342    #[test]
1343    fn dynamic_hub_is_hub() {
1344        use crate::activations::health::Health;
1345        let hub = DynamicHub::new("test").register(Health::new());
1346        let schema = hub.plugin_schema();
1347
1348        // DynamicHub should be a hub (has children)
1349        assert!(schema.is_hub(), "dynamic hub should be a hub");
1350        assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1351
1352        // Should have children (as summaries)
1353        let children = schema.children.expect("dynamic hub should have children");
1354        assert!(!children.is_empty(), "dynamic hub should have at least one child");
1355
1356        // Health should be in the children summaries
1357        let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1358        assert!(!health.hash.is_empty(), "health should have a hash");
1359    }
1360
1361    #[test]
1362    fn dynamic_hub_schema_structure() {
1363        use crate::activations::health::Health;
1364        let hub = DynamicHub::new("test").register(Health::new());
1365        let schema = hub.plugin_schema();
1366
1367        // Pretty print the schema
1368        let json = serde_json::to_string_pretty(&schema).unwrap();
1369        println!("DynamicHub schema:\n{}", json);
1370
1371        // Verify structure
1372        assert_eq!(schema.namespace, "test");
1373        assert!(schema.methods.iter().any(|m| m.name == "call"));
1374        assert!(schema.children.is_some());
1375    }
1376
1377    // ========================================================================
1378    // INVARIANT: Handle routing - resolves to correct plugin
1379    // ========================================================================
1380
1381    #[tokio::test]
1382    async fn invariant_resolve_handle_unknown_activation() {
1383        use crate::activations::health::Health;
1384        use crate::types::Handle;
1385        use uuid::Uuid;
1386
1387        let hub = DynamicHub::new("test").register(Health::new());
1388
1389        // Handle for an unregistered activation (random UUID)
1390        let unknown_plugin_id = Uuid::new_v4();
1391        let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1392
1393        let result = hub.do_resolve_handle(&handle).await;
1394
1395        match result {
1396            Err(PlexusError::ActivationNotFound(_)) => {
1397                // Expected - activation not found
1398            }
1399            Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1400            Ok(_) => panic!("Expected error for unknown activation"),
1401        }
1402    }
1403
1404    #[tokio::test]
1405    async fn invariant_resolve_handle_unsupported() {
1406        use crate::activations::health::Health;
1407        use crate::types::Handle;
1408
1409        let hub = DynamicHub::new("test").register(Health::new());
1410
1411        // Handle for health activation (which doesn't support handle resolution)
1412        let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1413
1414        let result = hub.do_resolve_handle(&handle).await;
1415
1416        match result {
1417            Err(PlexusError::HandleNotSupported(name)) => {
1418                assert_eq!(name, "health");
1419            }
1420            Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1421            Ok(_) => panic!("Expected error for unsupported handle"),
1422        }
1423    }
1424
1425    #[tokio::test]
1426    async fn invariant_resolve_handle_routes_by_plugin_id() {
1427        use crate::activations::health::Health;
1428        use crate::activations::echo::Echo;
1429        use crate::types::Handle;
1430        use uuid::Uuid;
1431
1432        let health = Health::new();
1433        let echo = Echo::new();
1434        let health_plugin_id = health.plugin_id();
1435        let echo_plugin_id = echo.plugin_id();
1436
1437        let hub = DynamicHub::new("test")
1438            .register(health)
1439            .register(echo);
1440
1441        // Health handle → health activation
1442        let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1443        match hub.do_resolve_handle(&health_handle).await {
1444            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1445            Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1446            Ok(_) => panic!("health handle should return HandleNotSupported"),
1447        }
1448
1449        // Echo handle → echo activation
1450        let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1451        match hub.do_resolve_handle(&echo_handle).await {
1452            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1453            Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1454            Ok(_) => panic!("echo handle should return HandleNotSupported"),
1455        }
1456
1457        // Unknown handle → ActivationNotFound (random UUID not registered)
1458        let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1459        match hub.do_resolve_handle(&unknown_handle).await {
1460            Err(PlexusError::ActivationNotFound(_)) => { /* expected */ },
1461            Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1462            Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1463        }
1464    }
1465
1466    #[test]
1467    fn invariant_handle_plugin_id_determines_routing() {
1468        use crate::activations::health::Health;
1469        use crate::activations::echo::Echo;
1470        use crate::types::Handle;
1471
1472        let health = Health::new();
1473        let echo = Echo::new();
1474
1475        // Same meta, different activations → different routing targets (by plugin_id)
1476        let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1477            .with_meta(vec!["msg-123".into(), "user".into()]);
1478        let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1479            .with_meta(vec!["msg-123".into(), "user".into()]);
1480
1481        // Different plugin_ids ensure different routing
1482        assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1483    }
1484
1485    // ========================================================================
1486    // Plugin Registry Tests
1487    // ========================================================================
1488
1489    #[test]
1490    fn plugin_registry_basic_operations() {
1491        let mut registry = PluginRegistry::new();
1492        let id = uuid::Uuid::new_v4();
1493
1494        // Register an activation
1495        registry.register(id, "test_plugin".to_string(), "test".to_string());
1496
1497        // Lookup by ID
1498        assert_eq!(registry.lookup(id), Some("test_plugin"));
1499
1500        // Lookup by path
1501        assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1502
1503        // Get entry
1504        let entry = registry.get(id).expect("should have entry");
1505        assert_eq!(entry.path, "test_plugin");
1506        assert_eq!(entry.plugin_type, "test");
1507    }
1508
1509    #[test]
1510    fn plugin_registry_populated_on_register() {
1511        use crate::activations::health::Health;
1512
1513        let hub = DynamicHub::new("test").register(Health::new());
1514
1515        let registry = hub.registry();
1516        assert!(!registry.is_empty(), "registry should not be empty after registration");
1517
1518        // Health activation should be registered
1519        let health_id = registry.lookup_by_path("health");
1520        assert!(health_id.is_some(), "health should be registered by path");
1521
1522        // Should be able to look up path by ID
1523        let health_uuid = health_id.unwrap();
1524        assert_eq!(registry.lookup(health_uuid), Some("health"));
1525    }
1526
1527    #[test]
1528    fn plugin_registry_deterministic_uuid() {
1529        use crate::activations::health::Health;
1530
1531        // Same activation registered twice should produce same UUID
1532        let health1 = Health::new();
1533        let health2 = Health::new();
1534
1535        assert_eq!(health1.plugin_id(), health2.plugin_id(),
1536            "same activation type should have deterministic UUID");
1537
1538        // UUID should be based on namespace+major_version (semver compatibility)
1539        let expected = uuid::Uuid::new_v5(
1540            &uuid::Uuid::NAMESPACE_OID,
1541            b"health@1"
1542        );
1543        assert_eq!(health1.plugin_id(), expected,
1544            "plugin_id should be deterministic from namespace@major_version");
1545    }
1546}