Skip to main content

plexus_core/plexus/
plexus.rs

1//! DynamicHub - the central routing layer for activations
2//!
3//! DynamicHub IS an activation that also serves as the registry for other activations.
4//! It implements the Plexus RPC protocol for routing and introspection.
5//! It uses hub-macro for its methods, with the `call` method using the streaming
6//! pattern to forward responses from routed methods.
7
8use super::{
9    context::PlexusContext,
10    method_enum::MethodEnumSchema,
11    schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12    streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use futures::Stream;
18use jsonrpsee::core::server::Methods;
19use jsonrpsee::RpcModule;
20use schemars::JsonSchema;
21use serde::{Deserialize, Serialize};
22use serde_json::Value;
23use std::collections::HashMap;
24use std::sync::Arc;
25
26// ============================================================================
27// Error Types
28// ============================================================================
29
30#[derive(Debug, Clone)]
31pub enum PlexusError {
32    ActivationNotFound(String),
33    MethodNotFound { activation: String, method: String },
34    InvalidParams(String),
35    ExecutionError(String),
36    HandleNotSupported(String),
37    TransportError(TransportErrorKind),
38}
39
40#[derive(Debug, Clone, Serialize, Deserialize)]
41#[serde(tag = "error_kind", rename_all = "snake_case")]
42pub enum TransportErrorKind {
43    ConnectionRefused { host: String, port: u16 },
44    ConnectionTimeout { host: String, port: u16 },
45    ProtocolError { message: String },
46    NetworkError { message: String },
47}
48
49impl std::fmt::Display for TransportErrorKind {
50    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
51        match self {
52            TransportErrorKind::ConnectionRefused { host, port } => {
53                write!(f, "Connection refused to {}:{}", host, port)
54            }
55            TransportErrorKind::ConnectionTimeout { host, port } => {
56                write!(f, "Connection timeout to {}:{}", host, port)
57            }
58            TransportErrorKind::ProtocolError { message } => {
59                write!(f, "Protocol error: {}", message)
60            }
61            TransportErrorKind::NetworkError { message } => {
62                write!(f, "Network error: {}", message)
63            }
64        }
65    }
66}
67
68impl std::fmt::Display for PlexusError {
69    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
70        match self {
71            PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
72            PlexusError::MethodNotFound { activation, method } => {
73                write!(f, "Method not found: {}.{}", activation, method)
74            }
75            PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
76            PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
77            PlexusError::HandleNotSupported(activation) => {
78                write!(f, "Handle resolution not supported by activation: {}", activation)
79            }
80            PlexusError::TransportError(kind) => match kind {
81                TransportErrorKind::ConnectionRefused { host, port } => {
82                    write!(f, "Connection refused to {}:{}", host, port)
83                }
84                TransportErrorKind::ConnectionTimeout { host, port } => {
85                    write!(f, "Connection timeout to {}:{}", host, port)
86                }
87                TransportErrorKind::ProtocolError { message } => {
88                    write!(f, "Protocol error: {}", message)
89                }
90                TransportErrorKind::NetworkError { message } => {
91                    write!(f, "Network error: {}", message)
92                }
93            }
94        }
95    }
96}
97
98impl std::error::Error for PlexusError {}
99
100// ============================================================================
101// Schema Types
102// ============================================================================
103
104#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
105pub struct ActivationInfo {
106    pub namespace: String,
107    pub version: String,
108    pub description: String,
109    pub methods: Vec<String>,
110}
111
112// ============================================================================
113// Activation Trait
114// ============================================================================
115
116#[async_trait]
117pub trait Activation: Send + Sync + 'static {
118    type Methods: MethodEnumSchema;
119
120    fn namespace(&self) -> &str;
121    fn version(&self) -> &str;
122    /// Short description (max 15 words)
123    fn description(&self) -> &str { "No description available" }
124    /// Long description (optional, for detailed documentation)
125    fn long_description(&self) -> Option<&str> { None }
126    fn methods(&self) -> Vec<&str>;
127    fn method_help(&self, _method: &str) -> Option<String> { None }
128    /// Stable activation instance ID for handle routing
129    /// By default generates a deterministic UUID from namespace+major_version
130    /// Using major version only ensures handles survive minor/patch upgrades (semver)
131    fn plugin_id(&self) -> uuid::Uuid {
132        let major_version = self.version().split('.').next().unwrap_or("0");
133        uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
134    }
135
136    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
137    async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
138        Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
139    }
140
141    fn into_rpc_methods(self) -> Methods where Self: Sized;
142
143    /// Return this activation's schema (methods + optional children)
144    fn plugin_schema(&self) -> PluginSchema {
145        use std::collections::hash_map::DefaultHasher;
146        use std::hash::{Hash, Hasher};
147
148        let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
149            let desc = self.method_help(name).unwrap_or_default();
150            // Compute a simple hash for methods not using hub-macro
151            let mut hasher = DefaultHasher::new();
152            name.hash(&mut hasher);
153            desc.hash(&mut hasher);
154            let hash = format!("{:016x}", hasher.finish());
155            MethodSchema::new(name.to_string(), desc, hash)
156        }).collect();
157
158        if let Some(long_desc) = self.long_description() {
159            PluginSchema::leaf_with_long_description(
160                self.namespace(),
161                self.version(),
162                self.description(),
163                long_desc,
164                methods,
165            )
166        } else {
167            PluginSchema::leaf(
168                self.namespace(),
169                self.version(),
170                self.description(),
171                methods,
172            )
173        }
174    }
175}
176
177// ============================================================================
178// Child Routing for Hub Plugins
179// ============================================================================
180
181/// Trait for activations that can route to child activations
182///
183/// Hub activations implement this to support nested method routing.
184/// When a method like "mercury.info" is called on a solar activation,
185/// this trait enables routing to the mercury child.
186///
187/// This trait is separate from Activation to avoid associated type issues
188/// with dynamic dispatch.
189#[async_trait]
190pub trait ChildRouter: Send + Sync {
191    /// Get the namespace of this router (for error messages)
192    fn router_namespace(&self) -> &str;
193
194    /// Call a method on this router
195    async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
196
197    /// Get a child activation instance by name for nested routing
198    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
199}
200
201/// Route a method call to a child activation
202///
203/// This is called from generated code when a hub activation receives
204/// a method that doesn't match its local methods. If the method
205/// contains a dot (e.g., "mercury.info"), it routes to the child.
206pub async fn route_to_child<T: ChildRouter + ?Sized>(
207    parent: &T,
208    method: &str,
209    params: Value,
210) -> Result<PlexusStream, PlexusError> {
211    // Try to split on first dot for nested routing
212    if let Some((child_name, rest)) = method.split_once('.') {
213        if let Some(child) = parent.get_child(child_name).await {
214            return child.router_call(rest, params).await;
215        }
216        return Err(PlexusError::ActivationNotFound(child_name.to_string()));
217    }
218
219    // No dot - method simply not found
220    Err(PlexusError::MethodNotFound {
221        activation: parent.router_namespace().to_string(),
222        method: method.to_string(),
223    })
224}
225
226/// Wrapper to implement ChildRouter for Arc<dyn ChildRouter>
227///
228/// This allows DynamicHub to return its stored Arc<dyn ChildRouter> from get_child()
229struct ArcChildRouter(Arc<dyn ChildRouter>);
230
231#[async_trait]
232impl ChildRouter for ArcChildRouter {
233    fn router_namespace(&self) -> &str {
234        self.0.router_namespace()
235    }
236
237    async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
238        self.0.router_call(method, params).await
239    }
240
241    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
242        self.0.get_child(name).await
243    }
244}
245
246// ============================================================================
247// Internal Type-Erased Activation
248// ============================================================================
249
250#[async_trait]
251#[allow(dead_code)] // Methods exist for completeness but some aren't called post-erasure yet
252trait ActivationObject: Send + Sync + 'static {
253    fn namespace(&self) -> &str;
254    fn version(&self) -> &str;
255    fn description(&self) -> &str;
256    fn long_description(&self) -> Option<&str>;
257    fn methods(&self) -> Vec<&str>;
258    fn method_help(&self, method: &str) -> Option<String>;
259    fn plugin_id(&self) -> uuid::Uuid;
260    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError>;
261    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
262    fn plugin_schema(&self) -> PluginSchema;
263    fn schema(&self) -> Schema;
264}
265
266struct ActivationWrapper<A: Activation> {
267    inner: A,
268}
269
270#[async_trait]
271impl<A: Activation> ActivationObject for ActivationWrapper<A> {
272    fn namespace(&self) -> &str { self.inner.namespace() }
273    fn version(&self) -> &str { self.inner.version() }
274    fn description(&self) -> &str { self.inner.description() }
275    fn long_description(&self) -> Option<&str> { self.inner.long_description() }
276    fn methods(&self) -> Vec<&str> { self.inner.methods() }
277    fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
278    fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
279
280    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
281        self.inner.call(method, params).await
282    }
283
284    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
285        self.inner.resolve_handle(handle).await
286    }
287
288    fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
289
290    fn schema(&self) -> Schema {
291        let schema = schemars::schema_for!(A::Methods);
292        serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
293            .expect("parse schema")
294    }
295}
296
297// ============================================================================
298// Plexus Event Types
299// ============================================================================
300
301#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
302#[serde(tag = "event", rename_all = "snake_case")]
303pub enum HashEvent {
304    Hash { value: String },
305}
306
307/// Event for schema() RPC method - returns plugin schema
308#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
309#[serde(tag = "event", rename_all = "snake_case")]
310pub enum SchemaEvent {
311    /// This plugin's schema
312    Schema(PluginSchema),
313}
314
315/// Lightweight hash information for cache validation
316#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
317pub struct PluginHashes {
318    pub namespace: String,
319    pub self_hash: String,
320    #[serde(skip_serializing_if = "Option::is_none")]
321    pub children_hash: Option<String>,
322    pub hash: String,
323    /// Child plugin hashes (for recursive checking)
324    #[serde(skip_serializing_if = "Option::is_none")]
325    pub children: Option<Vec<ChildHashes>>,
326}
327
328/// Hash information for a child plugin
329#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
330pub struct ChildHashes {
331    pub namespace: String,
332    pub hash: String,
333}
334
335
336// ============================================================================
337// Activation Registry
338// ============================================================================
339
340/// Entry in the activation registry
341#[derive(Debug, Clone)]
342pub struct PluginEntry {
343    /// Stable activation instance ID
344    pub id: uuid::Uuid,
345    /// Current path/namespace for this activation
346    pub path: String,
347    /// Activation type (e.g., "cone", "bash", "arbor")
348    pub plugin_type: String,
349}
350
351/// Registry mapping activation UUIDs to their current paths
352///
353/// This enables handle routing without path dependency - handles reference
354/// activations by their stable UUID, and the registry maps to the current path.
355#[derive(Default)]
356pub struct PluginRegistry {
357    /// Lookup by plugin UUID
358    by_id: HashMap<uuid::Uuid, PluginEntry>,
359    /// Lookup by current path (for reverse lookup)
360    by_path: HashMap<String, uuid::Uuid>,
361}
362
363/// Read-only snapshot of the activation registry
364///
365/// Safe to use outside of DynamicHub locks.
366#[derive(Clone)]
367pub struct PluginRegistrySnapshot {
368    by_id: HashMap<uuid::Uuid, PluginEntry>,
369    by_path: HashMap<String, uuid::Uuid>,
370}
371
372impl PluginRegistrySnapshot {
373    /// Look up an activation's path by its UUID
374    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
375        self.by_id.get(&id).map(|e| e.path.as_str())
376    }
377
378    /// Look up an activation's UUID by its path
379    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
380        self.by_path.get(path).copied()
381    }
382
383    /// Get an activation entry by its UUID
384    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
385        self.by_id.get(&id)
386    }
387
388    /// List all registered activations
389    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
390        self.by_id.values()
391    }
392
393    /// Get the number of registered plugins
394    pub fn len(&self) -> usize {
395        self.by_id.len()
396    }
397
398    /// Check if the registry is empty
399    pub fn is_empty(&self) -> bool {
400        self.by_id.is_empty()
401    }
402}
403
404impl PluginRegistry {
405    /// Create a new empty registry
406    pub fn new() -> Self {
407        Self::default()
408    }
409
410    /// Look up an activation's path by its UUID
411    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
412        self.by_id.get(&id).map(|e| e.path.as_str())
413    }
414
415    /// Look up an activation's UUID by its path
416    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
417        self.by_path.get(path).copied()
418    }
419
420    /// Get an activation entry by its UUID
421    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
422        self.by_id.get(&id)
423    }
424
425    /// Register an activation
426    pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
427        let entry = PluginEntry { id, path: path.clone(), plugin_type };
428        self.by_id.insert(id, entry);
429        self.by_path.insert(path, id);
430    }
431
432    /// List all registered activations
433    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
434        self.by_id.values()
435    }
436
437    /// Get the number of registered plugins
438    pub fn len(&self) -> usize {
439        self.by_id.len()
440    }
441
442    /// Check if the registry is empty
443    pub fn is_empty(&self) -> bool {
444        self.by_id.is_empty()
445    }
446}
447
448// ============================================================================
449// DynamicHub (formerly Plexus)
450// ============================================================================
451
452struct DynamicHubInner {
453    /// Custom namespace for this hub instance (defaults to "plexus")
454    namespace: String,
455    activations: HashMap<String, Arc<dyn ActivationObject>>,
456    /// Child routers for direct nested routing (e.g., hub.solar.mercury.info)
457    child_routers: HashMap<String, Arc<dyn ChildRouter>>,
458    /// Activation registry mapping UUIDs to paths
459    registry: std::sync::RwLock<PluginRegistry>,
460    pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
461}
462
463/// DynamicHub - an activation that routes to dynamically registered child activations
464///
465/// Unlike hub activations with hardcoded children (like Solar),
466/// DynamicHub allows registering activations at runtime via `.register()`.
467///
468/// # Direct Hosting
469///
470/// For a single activation, host it directly:
471/// ```ignore
472/// let solar = Arc::new(Solar::new());
473/// TransportServer::builder(solar, converter).serve().await?;
474/// ```
475///
476/// # Composition
477///
478/// For multiple top-level activations, use DynamicHub:
479/// ```ignore
480/// let hub = DynamicHub::with_namespace("myapp")
481///     .register(Solar::new())
482///     .register(Echo::new());
483/// ```
484#[derive(Clone)]
485pub struct DynamicHub {
486    inner: Arc<DynamicHubInner>,
487}
488
489// ============================================================================
490// DynamicHub Infrastructure (non-RPC methods)
491// ============================================================================
492
493impl DynamicHub {
494    /// Create a new DynamicHub with explicit namespace
495    ///
496    /// Unlike single activations which have fixed namespaces, DynamicHub is a
497    /// composition tool that can be named based on your application. Common choices:
498    /// - "hub" - generic default
499    /// - "substrate" - for substrate server
500    /// - "myapp" - for your application name
501    ///
502    /// The namespace appears in method calls: `{namespace}.call`, `{namespace}.schema`
503    pub fn new(namespace: impl Into<String>) -> Self {
504        Self {
505            inner: Arc::new(DynamicHubInner {
506                namespace: namespace.into(),
507                activations: HashMap::new(),
508                child_routers: HashMap::new(),
509                registry: std::sync::RwLock::new(PluginRegistry::new()),
510                pending_rpc: std::sync::Mutex::new(Vec::new()),
511            }),
512        }
513    }
514
515    /// Deprecated: Use new() with explicit namespace instead
516    #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
517    pub fn with_namespace(namespace: impl Into<String>) -> Self {
518        Self::new(namespace)
519    }
520
521    /// Get the runtime namespace for this DynamicHub instance
522    pub fn runtime_namespace(&self) -> &str {
523        &self.inner.namespace
524    }
525
526    /// Get access to the activation registry
527    pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
528        self.inner.registry.read().unwrap()
529    }
530
531    /// Register an activation
532    pub fn register<A: Activation + Clone>(mut self, activation: A) -> Self {
533        let namespace = activation.namespace().to_string();
534        let plugin_id = activation.plugin_id();
535        let activation_for_rpc = activation.clone();
536
537        let inner = Arc::get_mut(&mut self.inner)
538            .expect("Cannot register: DynamicHub has multiple references");
539
540        // Register in the activation registry
541        inner.registry.write().unwrap().register(
542            plugin_id,
543            namespace.clone(),
544            namespace.clone(), // Use namespace as plugin_type for now
545        );
546
547        inner.activations.insert(namespace, Arc::new(ActivationWrapper { inner: activation }));
548        inner.pending_rpc.lock().unwrap()
549            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
550        self
551    }
552
553    /// Register a hub activation that supports nested routing
554    ///
555    /// Hub activations implement `ChildRouter`, enabling direct nested method calls
556    /// like `hub.solar.mercury.info` at the RPC layer (no hub.call indirection).
557    pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
558        let namespace = activation.namespace().to_string();
559        let plugin_id = activation.plugin_id();
560        let activation_for_rpc = activation.clone();
561        let activation_for_router = activation.clone();
562
563        let inner = Arc::get_mut(&mut self.inner)
564            .expect("Cannot register: DynamicHub has multiple references");
565
566        // Register in the activation registry
567        inner.registry.write().unwrap().register(
568            plugin_id,
569            namespace.clone(),
570            namespace.clone(), // Use namespace as plugin_type for now
571        );
572
573        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
574        inner.child_routers.insert(namespace, Arc::new(activation_for_router));
575        inner.pending_rpc.lock().unwrap()
576            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
577        self
578    }
579
580    /// List all methods across all activations
581    pub fn list_methods(&self) -> Vec<String> {
582        let mut methods = Vec::new();
583
584        // Include hub's own methods
585        for m in Activation::methods(self) {
586            methods.push(format!("{}.{}", self.inner.namespace, m));
587        }
588
589        // Include registered activation methods
590        for (ns, act) in &self.inner.activations {
591            for m in act.methods() {
592                methods.push(format!("{}.{}", ns, m));
593            }
594        }
595        methods.sort();
596        methods
597    }
598
599    /// List all activations (including this hub itself)
600    pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
601        let mut activations = Vec::new();
602
603        // Include this hub itself
604        activations.push(ActivationInfo {
605            namespace: Activation::namespace(self).to_string(),
606            version: Activation::version(self).to_string(),
607            description: Activation::description(self).to_string(),
608            methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
609        });
610
611        // Include registered activations
612        for a in self.inner.activations.values() {
613            activations.push(ActivationInfo {
614                namespace: a.namespace().to_string(),
615                version: a.version().to_string(),
616                description: a.description().to_string(),
617                methods: a.methods().iter().map(|s| s.to_string()).collect(),
618            });
619        }
620
621        activations
622    }
623
624    /// Compute hash for cache invalidation
625    ///
626    /// Returns the hash from the recursive plugin schema. This hash changes
627    /// whenever any method definition or child plugin changes.
628    pub fn compute_hash(&self) -> String {
629        Activation::plugin_schema(self).hash
630    }
631
632    /// Route a call to the appropriate activation
633    pub async fn route(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
634        let (namespace, method_name) = self.parse_method(method)?;
635
636        // Handle plexus's own methods
637        if namespace == self.inner.namespace {
638            return Activation::call(self, method_name, params).await;
639        }
640
641        let activation = self.inner.activations.get(namespace)
642            .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
643
644        activation.call(method_name, params).await
645    }
646
647    /// Resolve a handle using the activation registry
648    ///
649    /// Looks up the activation by its UUID in the registry.
650    pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
651        let path = self.inner.registry.read().unwrap()
652            .lookup(handle.plugin_id)
653            .map(|s| s.to_string())
654            .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
655
656        let activation = self.inner.activations.get(&path)
657            .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
658        activation.resolve_handle(handle).await
659    }
660
661    /// Get activation schema
662    pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
663        self.inner.activations.get(namespace).map(|a| a.schema())
664    }
665
666    /// Get a snapshot of the activation registry (safe to use outside locks)
667    pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
668        let guard = self.inner.registry.read().unwrap();
669        PluginRegistrySnapshot {
670            by_id: guard.by_id.clone(),
671            by_path: guard.by_path.clone(),
672        }
673    }
674
675    /// Look up an activation path by its UUID
676    pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
677        self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
678    }
679
680    /// Look up an activation UUID by its path
681    pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
682        self.inner.registry.read().unwrap().lookup_by_path(path)
683    }
684
685    /// Get activation schemas for all activations (including this hub itself)
686    pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
687        let mut schemas = Vec::new();
688
689        // Include this hub itself
690        schemas.push(Activation::plugin_schema(self));
691
692        // Include registered activations
693        for a in self.inner.activations.values() {
694            schemas.push(a.plugin_schema());
695        }
696
697        schemas
698    }
699
700    /// Deprecated: use list_plugin_schemas instead
701    #[deprecated(note = "Use list_plugin_schemas instead")]
702    pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
703        self.list_plugin_schemas()
704    }
705
706    /// Get help for a method
707    pub fn get_method_help(&self, method: &str) -> Option<String> {
708        let (namespace, method_name) = self.parse_method(method).ok()?;
709        let activation = self.inner.activations.get(namespace)?;
710        activation.method_help(method_name)
711    }
712
713    fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
714        let parts: Vec<&str> = method.splitn(2, '.').collect();
715        if parts.len() != 2 {
716            return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
717        }
718        Ok((parts[0], parts[1]))
719    }
720
721    /// Get child activation summaries (for hub functionality)
722    /// Called by hub-macro when `hub` flag is set
723    pub fn plugin_children(&self) -> Vec<ChildSummary> {
724        self.inner.activations.values()
725            .map(|a| {
726                let schema = a.plugin_schema();
727                ChildSummary {
728                    namespace: schema.namespace,
729                    description: schema.description,
730                    hash: schema.hash,
731                }
732            })
733            .collect()
734    }
735
736    /// Convert to RPC module
737    pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
738        let mut module = RpcModule::new(());
739
740        PlexusContext::init(self.compute_hash());
741
742        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
743        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
744        let ns = self.runtime_namespace();
745        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
746        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
747        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
748        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
749        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
750        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
751        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
752        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
753        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
754
755        // Register {ns}.call subscription
756        let plexus_for_call = self.clone();
757        module.register_subscription(
758            call_method,
759            call_method,
760            call_unsub,
761            move |params, pending, _ctx, _ext| {
762                let plexus = plexus_for_call.clone();
763                Box::pin(async move {
764                    // Parse params: {"method": "...", "params": {...}}
765                    let p: CallParams = params.parse()?;
766                    let stream = plexus.route(&p.method, p.params.unwrap_or_default()).await
767                        .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
768                    pipe_stream_to_subscription(pending, stream).await
769                })
770            }
771        )?;
772
773        // Register {ns}.hash subscription
774        let plexus_for_hash = self.clone();
775        module.register_subscription(
776            hash_method,
777            hash_method,
778            hash_unsub,
779            move |_params, pending, _ctx, _ext| {
780                let plexus = plexus_for_hash.clone();
781                Box::pin(async move {
782                    let schema = Activation::plugin_schema(&plexus);
783                    let stream = async_stream::stream! {
784                        yield HashEvent::Hash { value: schema.hash };
785                    };
786                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
787                    pipe_stream_to_subscription(pending, wrapped).await
788                })
789            }
790        )?;
791
792        // Register {ns}.schema subscription
793        let plexus_for_schema = self.clone();
794        module.register_subscription(
795            schema_method,
796            schema_method,
797            schema_unsub,
798            move |params, pending, _ctx, _ext| {
799                let plexus = plexus_for_schema.clone();
800                Box::pin(async move {
801                    let p: SchemaParams = params.parse().unwrap_or_default();
802                    let plugin_schema = Activation::plugin_schema(&plexus);
803
804                    let result = if let Some(ref name) = p.method {
805                        plugin_schema.methods.iter()
806                            .find(|m| m.name == *name)
807                            .map(|m| super::SchemaResult::Method(m.clone()))
808                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
809                                -32602,
810                                format!("Method '{}' not found", name),
811                                None::<()>,
812                            ))?
813                    } else {
814                        super::SchemaResult::Plugin(plugin_schema)
815                    };
816
817                    let stream = async_stream::stream! { yield result; };
818                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
819                    pipe_stream_to_subscription(pending, wrapped).await
820                })
821            }
822        )?;
823
824        // Register _info well-known endpoint (no namespace prefix)
825        // Returns backend name as a single-item stream with automatic Done event
826        let backend_name = self.runtime_namespace().to_string();
827        module.register_subscription(
828            "_info",
829            "_info",
830            "_info_unsub",
831            move |_params, pending, _ctx, _ext| {
832                let name = backend_name.clone();
833                Box::pin(async move {
834                    // Create a single-item stream with the info response
835                    let info_stream = futures::stream::once(async move {
836                        serde_json::json!({"backend": name})
837                    });
838
839                    // Wrap to auto-append Done event
840                    let wrapped = super::streaming::wrap_stream(
841                        info_stream,
842                        "_info",
843                        vec![]
844                    );
845
846                    // Pipe to subscription (handles Done automatically)
847                    pipe_stream_to_subscription(pending, wrapped).await
848                })
849            }
850        )?;
851
852        // Add all registered activation RPC methods
853        let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
854        for factory in pending {
855            module.merge(factory())?;
856        }
857
858        Ok(module)
859    }
860
861    /// Convert Arc<DynamicHub> to RPC module while keeping the Arc alive
862    ///
863    /// Unlike `into_rpc_module`, this keeps the Arc<DynamicHub> reference alive,
864    /// which is necessary when activations hold Weak<DynamicHub> references that
865    /// need to remain upgradeable.
866    pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
867        let mut module = RpcModule::new(());
868
869        PlexusContext::init(hub.compute_hash());
870
871        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
872        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
873        let ns = hub.runtime_namespace();
874        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
875        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
876        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
877        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
878        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
879        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
880        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
881        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
882        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
883
884        // Register {ns}.call subscription - clone Arc to keep reference alive
885        let hub_for_call = hub.clone();
886        module.register_subscription(
887            call_method,
888            call_method,
889            call_unsub,
890            move |params, pending, _ctx, _ext| {
891                let hub = hub_for_call.clone();
892                Box::pin(async move {
893                    let p: CallParams = params.parse()?;
894                    let stream = hub.route(&p.method, p.params.unwrap_or_default()).await
895                        .map_err(|e| jsonrpsee::types::ErrorObject::owned(-32000, e.to_string(), None::<()>))?;
896                    pipe_stream_to_subscription(pending, stream).await
897                })
898            }
899        )?;
900
901        // Register {ns}.hash subscription
902        let hub_for_hash = hub.clone();
903        module.register_subscription(
904            hash_method,
905            hash_method,
906            hash_unsub,
907            move |_params, pending, _ctx, _ext| {
908                let hub = hub_for_hash.clone();
909                Box::pin(async move {
910                    let schema = Activation::plugin_schema(&*hub);
911                    let stream = async_stream::stream! {
912                        yield HashEvent::Hash { value: schema.hash };
913                    };
914                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
915                    pipe_stream_to_subscription(pending, wrapped).await
916                })
917            }
918        )?;
919
920        // Register {ns}.schema subscription
921        let hub_for_schema = hub.clone();
922        module.register_subscription(
923            schema_method,
924            schema_method,
925            schema_unsub,
926            move |params, pending, _ctx, _ext| {
927                let hub = hub_for_schema.clone();
928                Box::pin(async move {
929                    let p: SchemaParams = params.parse().unwrap_or_default();
930                    let plugin_schema = Activation::plugin_schema(&*hub);
931
932                    let result = if let Some(ref name) = p.method {
933                        plugin_schema.methods.iter()
934                            .find(|m| m.name == *name)
935                            .map(|m| super::SchemaResult::Method(m.clone()))
936                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
937                                -32602,
938                                format!("Method '{}' not found", name),
939                                None::<()>,
940                            ))?
941                    } else {
942                        super::SchemaResult::Plugin(plugin_schema)
943                    };
944
945                    let stream = async_stream::stream! {
946                        yield result;
947                    };
948                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
949                    pipe_stream_to_subscription(pending, wrapped).await
950                })
951            }
952        )?;
953
954        // Register _info well-known endpoint (no namespace prefix)
955        // Returns backend name as a single-item stream with automatic Done event
956        let backend_name = hub.runtime_namespace().to_string();
957        module.register_subscription(
958            "_info",
959            "_info",
960            "_info_unsub",
961            move |_params, pending, _ctx, _ext| {
962                let name = backend_name.clone();
963                Box::pin(async move {
964                    // Create a single-item stream with the info response
965                    let info_stream = futures::stream::once(async move {
966                        serde_json::json!({"backend": name})
967                    });
968
969                    // Wrap to auto-append Done event
970                    let wrapped = super::streaming::wrap_stream(
971                        info_stream,
972                        "_info",
973                        vec![]
974                    );
975
976                    // Pipe to subscription (handles Done automatically)
977                    pipe_stream_to_subscription(pending, wrapped).await
978                })
979            }
980        )?;
981
982        // Register {ns}.respond method for WebSocket bidirectional responses
983        // This allows clients to respond to server-initiated requests (like confirmations/prompts)
984        let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
985        module.register_async_method(respond_method, |params, _ctx, _ext| async move {
986            use super::bidirectional::{handle_pending_response, BidirError};
987
988            let p: RespondParams = params.parse()?;
989
990            tracing::debug!(
991                request_id = %p.request_id,
992                "Handling {}.respond via WebSocket",
993                "plexus"
994            );
995
996            match handle_pending_response(&p.request_id, p.response_data) {
997                Ok(()) => Ok(serde_json::json!({"success": true})),
998                Err(BidirError::UnknownRequest) => {
999                    tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1000                    Err(jsonrpsee::types::ErrorObject::owned(
1001                        -32602,
1002                        format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1003                        None::<()>,
1004                    ))
1005                }
1006                Err(BidirError::ChannelClosed) => {
1007                    tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1008                    Err(jsonrpsee::types::ErrorObject::owned(
1009                        -32000,
1010                        "Response channel was closed (request may have timed out)",
1011                        None::<()>,
1012                    ))
1013                }
1014                Err(e) => {
1015                    tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1016                    Err(jsonrpsee::types::ErrorObject::owned(
1017                        -32000,
1018                        format!("Failed to deliver response: {}", e),
1019                        None::<()>,
1020                    ))
1021                }
1022            }
1023        })?;
1024
1025        // Register pending RPC methods from activations
1026        let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1027        for factory in pending {
1028            module.merge(factory())?;
1029        }
1030
1031        Ok(module)
1032    }
1033}
1034
1035/// Params for {ns}.call
1036#[derive(Debug, serde::Deserialize)]
1037struct CallParams {
1038    method: String,
1039    #[serde(default)]
1040    params: Option<Value>,
1041}
1042
1043/// Params for {ns}.schema
1044#[derive(Debug, Default, serde::Deserialize)]
1045struct SchemaParams {
1046    method: Option<String>,
1047}
1048
1049/// Params for {ns}.respond (WebSocket bidirectional response)
1050#[derive(Debug, serde::Deserialize)]
1051struct RespondParams {
1052    request_id: String,
1053    response_data: Value,
1054}
1055
1056/// Helper to pipe a PlexusStream to a subscription sink
1057async fn pipe_stream_to_subscription(
1058    pending: jsonrpsee::PendingSubscriptionSink,
1059    mut stream: PlexusStream,
1060) -> jsonrpsee::core::SubscriptionResult {
1061    use futures::StreamExt;
1062    use jsonrpsee::SubscriptionMessage;
1063
1064    let sink = pending.accept().await?;
1065    while let Some(item) = stream.next().await {
1066        let msg = SubscriptionMessage::new("result", sink.subscription_id(), &item)?;
1067        sink.send(msg).await?;
1068    }
1069    Ok(())
1070}
1071
1072// ============================================================================
1073// DynamicHub RPC Methods (via plexus-macros)
1074// ============================================================================
1075
1076#[plexus_macros::hub_methods(
1077    namespace = "plexus",
1078    version = "1.0.0",
1079    description = "Central routing and introspection",
1080    hub,
1081    namespace_fn = "runtime_namespace"
1082)]
1083impl DynamicHub {
1084    /// Route a call to a registered activation
1085    #[plexus_macros::hub_method(
1086        streaming,
1087        description = "Route a call to a registered activation",
1088        params(
1089            method = "The method to call (format: namespace.method)",
1090            params = "Parameters to pass to the method (optional, defaults to {})"
1091        )
1092    )]
1093    async fn call(
1094        &self,
1095        method: String,
1096        params: Option<Value>,
1097    ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1098        use super::context::PlexusContext;
1099        use super::types::{PlexusStreamItem, StreamMetadata};
1100
1101        let result = self.route(&method, params.unwrap_or_default()).await;
1102
1103        match result {
1104            Ok(plexus_stream) => {
1105                // Forward the routed stream directly - it already contains PlexusStreamItems
1106                plexus_stream
1107            }
1108            Err(e) => {
1109                // Return error as a PlexusStreamItem stream
1110                let metadata = StreamMetadata::new(
1111                    vec![self.inner.namespace.clone()],
1112                    PlexusContext::hash(),
1113                );
1114                Box::pin(futures::stream::once(async move {
1115                    PlexusStreamItem::Error {
1116                        metadata,
1117                        message: e.to_string(),
1118                        code: None,
1119                        recoverable: false,
1120                    }
1121                }))
1122            }
1123        }
1124    }
1125
1126    /// Get Plexus RPC server configuration hash (from the recursive schema)
1127    ///
1128    /// This hash changes whenever any method or child activation changes.
1129    /// It's computed from the method hashes rolled up through the schema tree.
1130    #[plexus_macros::hub_method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1131    async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1132        let schema = Activation::plugin_schema(self);
1133        stream! { yield HashEvent::Hash { value: schema.hash }; }
1134    }
1135
1136    /// Get plugin hashes for cache validation (lightweight alternative to full schema)
1137    #[plexus_macros::hub_method(description = "Get plugin hashes for cache validation")]
1138    async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1139        let schema = Activation::plugin_schema(self);
1140
1141        stream! {
1142            yield PluginHashes {
1143                namespace: schema.namespace.clone(),
1144                self_hash: schema.self_hash.clone(),
1145                children_hash: schema.children_hash.clone(),
1146                hash: schema.hash.clone(),
1147                children: schema.children.as_ref().map(|kids| {
1148                    kids.iter()
1149                        .map(|c| ChildHashes {
1150                            namespace: c.namespace.clone(),
1151                            hash: c.hash.clone(),
1152                        })
1153                        .collect()
1154                }),
1155            };
1156        }
1157    }
1158
1159    // Note: schema() method is auto-generated by hub-macro for all activations
1160}
1161
1162// ============================================================================
1163// HubContext Implementation for Weak<DynamicHub>
1164// ============================================================================
1165
1166use super::hub_context::HubContext;
1167use std::sync::Weak;
1168
1169/// HubContext implementation for Weak<DynamicHub>
1170///
1171/// This enables activations to receive a weak reference to their parent DynamicHub,
1172/// allowing them to resolve handles and route calls through the hub without
1173/// creating reference cycles.
1174#[async_trait]
1175impl HubContext for Weak<DynamicHub> {
1176    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1177        let hub = self.upgrade().ok_or_else(|| {
1178            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1179        })?;
1180        hub.do_resolve_handle(handle).await
1181    }
1182
1183    async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1184        let hub = self.upgrade().ok_or_else(|| {
1185            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1186        })?;
1187        hub.route(method, params).await
1188    }
1189
1190    fn is_valid(&self) -> bool {
1191        self.upgrade().is_some()
1192    }
1193}
1194
1195/// ChildRouter implementation for DynamicHub
1196///
1197/// This enables nested routing through registered activations.
1198/// e.g., hub.call("solar.mercury.info") routes to solar → mercury → info
1199#[async_trait]
1200impl ChildRouter for DynamicHub {
1201    fn router_namespace(&self) -> &str {
1202        &self.inner.namespace
1203    }
1204
1205    async fn router_call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
1206        // DynamicHub routes via its registered activations
1207        // Method format: "activation.method" or "activation.child.method"
1208        self.route(method, params).await
1209    }
1210
1211    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1212        // Look up registered activations that implement ChildRouter
1213        self.inner.child_routers.get(name)
1214            .map(|router| {
1215                // Clone the Arc and wrap in Box for the trait object
1216                Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1217            })
1218    }
1219}
1220
1221#[cfg(test)]
1222mod tests {
1223    use super::*;
1224
1225    #[test]
1226    fn dynamic_hub_implements_activation() {
1227        fn assert_activation<T: Activation>() {}
1228        assert_activation::<DynamicHub>();
1229    }
1230
1231    #[test]
1232    fn dynamic_hub_methods() {
1233        let hub = DynamicHub::new("test");
1234        let methods = hub.methods();
1235        assert!(methods.contains(&"call"));
1236        assert!(methods.contains(&"hash"));
1237        assert!(methods.contains(&"schema"));
1238        // list_activations was removed - use schema() instead
1239    }
1240
1241    #[test]
1242    fn dynamic_hub_hash_stable() {
1243        let h1 = DynamicHub::new("test");
1244        let h2 = DynamicHub::new("test");
1245        assert_eq!(h1.compute_hash(), h2.compute_hash());
1246    }
1247
1248    #[test]
1249    fn dynamic_hub_is_hub() {
1250        use crate::activations::health::Health;
1251        let hub = DynamicHub::new("test").register(Health::new());
1252        let schema = hub.plugin_schema();
1253
1254        // DynamicHub should be a hub (has children)
1255        assert!(schema.is_hub(), "dynamic hub should be a hub");
1256        assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1257
1258        // Should have children (as summaries)
1259        let children = schema.children.expect("dynamic hub should have children");
1260        assert!(!children.is_empty(), "dynamic hub should have at least one child");
1261
1262        // Health should be in the children summaries
1263        let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1264        assert!(!health.hash.is_empty(), "health should have a hash");
1265    }
1266
1267    #[test]
1268    fn dynamic_hub_schema_structure() {
1269        use crate::activations::health::Health;
1270        let hub = DynamicHub::new("test").register(Health::new());
1271        let schema = hub.plugin_schema();
1272
1273        // Pretty print the schema
1274        let json = serde_json::to_string_pretty(&schema).unwrap();
1275        println!("DynamicHub schema:\n{}", json);
1276
1277        // Verify structure
1278        assert_eq!(schema.namespace, "test");
1279        assert!(schema.methods.iter().any(|m| m.name == "call"));
1280        assert!(schema.children.is_some());
1281    }
1282
1283    // ========================================================================
1284    // INVARIANT: Handle routing - resolves to correct plugin
1285    // ========================================================================
1286
1287    #[tokio::test]
1288    async fn invariant_resolve_handle_unknown_activation() {
1289        use crate::activations::health::Health;
1290        use crate::types::Handle;
1291        use uuid::Uuid;
1292
1293        let hub = DynamicHub::new("test").register(Health::new());
1294
1295        // Handle for an unregistered activation (random UUID)
1296        let unknown_plugin_id = Uuid::new_v4();
1297        let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1298
1299        let result = hub.do_resolve_handle(&handle).await;
1300
1301        match result {
1302            Err(PlexusError::ActivationNotFound(_)) => {
1303                // Expected - activation not found
1304            }
1305            Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1306            Ok(_) => panic!("Expected error for unknown activation"),
1307        }
1308    }
1309
1310    #[tokio::test]
1311    async fn invariant_resolve_handle_unsupported() {
1312        use crate::activations::health::Health;
1313        use crate::types::Handle;
1314
1315        let hub = DynamicHub::new("test").register(Health::new());
1316
1317        // Handle for health activation (which doesn't support handle resolution)
1318        let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1319
1320        let result = hub.do_resolve_handle(&handle).await;
1321
1322        match result {
1323            Err(PlexusError::HandleNotSupported(name)) => {
1324                assert_eq!(name, "health");
1325            }
1326            Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1327            Ok(_) => panic!("Expected error for unsupported handle"),
1328        }
1329    }
1330
1331    #[tokio::test]
1332    async fn invariant_resolve_handle_routes_by_plugin_id() {
1333        use crate::activations::health::Health;
1334        use crate::activations::echo::Echo;
1335        use crate::types::Handle;
1336        use uuid::Uuid;
1337
1338        let health = Health::new();
1339        let echo = Echo::new();
1340        let health_plugin_id = health.plugin_id();
1341        let echo_plugin_id = echo.plugin_id();
1342
1343        let hub = DynamicHub::new("test")
1344            .register(health)
1345            .register(echo);
1346
1347        // Health handle → health activation
1348        let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1349        match hub.do_resolve_handle(&health_handle).await {
1350            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1351            Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1352            Ok(_) => panic!("health handle should return HandleNotSupported"),
1353        }
1354
1355        // Echo handle → echo activation
1356        let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1357        match hub.do_resolve_handle(&echo_handle).await {
1358            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1359            Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1360            Ok(_) => panic!("echo handle should return HandleNotSupported"),
1361        }
1362
1363        // Unknown handle → ActivationNotFound (random UUID not registered)
1364        let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1365        match hub.do_resolve_handle(&unknown_handle).await {
1366            Err(PlexusError::ActivationNotFound(_)) => { /* expected */ },
1367            Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1368            Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1369        }
1370    }
1371
1372    #[test]
1373    fn invariant_handle_plugin_id_determines_routing() {
1374        use crate::activations::health::Health;
1375        use crate::activations::echo::Echo;
1376        use crate::types::Handle;
1377
1378        let health = Health::new();
1379        let echo = Echo::new();
1380
1381        // Same meta, different activations → different routing targets (by plugin_id)
1382        let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1383            .with_meta(vec!["msg-123".into(), "user".into()]);
1384        let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1385            .with_meta(vec!["msg-123".into(), "user".into()]);
1386
1387        // Different plugin_ids ensure different routing
1388        assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1389    }
1390
1391    // ========================================================================
1392    // Plugin Registry Tests
1393    // ========================================================================
1394
1395    #[test]
1396    fn plugin_registry_basic_operations() {
1397        let mut registry = PluginRegistry::new();
1398        let id = uuid::Uuid::new_v4();
1399
1400        // Register an activation
1401        registry.register(id, "test_plugin".to_string(), "test".to_string());
1402
1403        // Lookup by ID
1404        assert_eq!(registry.lookup(id), Some("test_plugin"));
1405
1406        // Lookup by path
1407        assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1408
1409        // Get entry
1410        let entry = registry.get(id).expect("should have entry");
1411        assert_eq!(entry.path, "test_plugin");
1412        assert_eq!(entry.plugin_type, "test");
1413    }
1414
1415    #[test]
1416    fn plugin_registry_populated_on_register() {
1417        use crate::activations::health::Health;
1418
1419        let hub = DynamicHub::new("test").register(Health::new());
1420
1421        let registry = hub.registry();
1422        assert!(!registry.is_empty(), "registry should not be empty after registration");
1423
1424        // Health activation should be registered
1425        let health_id = registry.lookup_by_path("health");
1426        assert!(health_id.is_some(), "health should be registered by path");
1427
1428        // Should be able to look up path by ID
1429        let health_uuid = health_id.unwrap();
1430        assert_eq!(registry.lookup(health_uuid), Some("health"));
1431    }
1432
1433    #[test]
1434    fn plugin_registry_deterministic_uuid() {
1435        use crate::activations::health::Health;
1436
1437        // Same activation registered twice should produce same UUID
1438        let health1 = Health::new();
1439        let health2 = Health::new();
1440
1441        assert_eq!(health1.plugin_id(), health2.plugin_id(),
1442            "same activation type should have deterministic UUID");
1443
1444        // UUID should be based on namespace+major_version (semver compatibility)
1445        let expected = uuid::Uuid::new_v5(
1446            &uuid::Uuid::NAMESPACE_OID,
1447            b"health@1"
1448        );
1449        assert_eq!(health1.plugin_id(), expected,
1450            "plugin_id should be deterministic from namespace@major_version");
1451    }
1452}