Skip to main content

plexus_core/plexus/
plexus.rs

1//! DynamicHub - the central routing layer for activations
2//!
3//! DynamicHub IS an activation that also serves as the registry for other activations.
4//! It implements the Plexus RPC protocol for routing and introspection.
5//! It uses hub-macro for its methods, with the `call` method using the streaming
6//! pattern to forward responses from routed methods.
7
8use super::{
9    context::PlexusContext,
10    method_enum::MethodEnumSchema,
11    schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12    streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use bitflags::bitflags;
18use futures::Stream;
19use futures_core::stream::BoxStream;
20use jsonrpsee::core::server::Methods;
21use jsonrpsee::RpcModule;
22
23/// The JSON-RPC method name used in all plexus subscription notifications.
24///
25/// Every subscription registered by plexus (`.call`, `.hash`, `.schema`, `_info`)
26/// sends notifications with `"method": PLEXUS_NOTIF_METHOD` on the wire.
27/// Clients must match against this value when dispatching raw subscription frames.
28pub const PLEXUS_NOTIF_METHOD: &str = "result";
29use schemars::JsonSchema;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35// ============================================================================
36// Error Types
37// ============================================================================
38
39#[derive(Debug, Clone)]
40pub enum PlexusError {
41    ActivationNotFound(String),
42    MethodNotFound { activation: String, method: String },
43    InvalidParams(String),
44    ExecutionError(String),
45    HandleNotSupported(String),
46    TransportError(TransportErrorKind),
47    Unauthenticated(String),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(tag = "error_kind", rename_all = "snake_case")]
52pub enum TransportErrorKind {
53    ConnectionRefused { host: String, port: u16 },
54    ConnectionTimeout { host: String, port: u16 },
55    ProtocolError { message: String },
56    NetworkError { message: String },
57}
58
59impl std::fmt::Display for TransportErrorKind {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            TransportErrorKind::ConnectionRefused { host, port } => {
63                write!(f, "Connection refused to {}:{}", host, port)
64            }
65            TransportErrorKind::ConnectionTimeout { host, port } => {
66                write!(f, "Connection timeout to {}:{}", host, port)
67            }
68            TransportErrorKind::ProtocolError { message } => {
69                write!(f, "Protocol error: {}", message)
70            }
71            TransportErrorKind::NetworkError { message } => {
72                write!(f, "Network error: {}", message)
73            }
74        }
75    }
76}
77
78impl std::fmt::Display for PlexusError {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
82            PlexusError::MethodNotFound { activation, method } => {
83                write!(f, "Method not found: {}.{}", activation, method)
84            }
85            PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
86            PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
87            PlexusError::HandleNotSupported(activation) => {
88                write!(f, "Handle resolution not supported by activation: {}", activation)
89            }
90            PlexusError::TransportError(kind) => match kind {
91                TransportErrorKind::ConnectionRefused { host, port } => {
92                    write!(f, "Connection refused to {}:{}", host, port)
93                }
94                TransportErrorKind::ConnectionTimeout { host, port } => {
95                    write!(f, "Connection timeout to {}:{}", host, port)
96                }
97                TransportErrorKind::ProtocolError { message } => {
98                    write!(f, "Protocol error: {}", message)
99                }
100                TransportErrorKind::NetworkError { message } => {
101                    write!(f, "Network error: {}", message)
102                }
103            }
104            PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
105        }
106    }
107}
108
109impl std::error::Error for PlexusError {}
110
111/// Convert PlexusError to a JSON-RPC ErrorObject with semantic error codes.
112///
113/// Codes:
114/// - `-32001`: Authentication required (custom app-level error)
115/// - `-32601`: Method/activation not found (standard JSON-RPC)
116/// - `-32602`: Invalid parameters (standard JSON-RPC)
117/// - `-32000`: Generic server error (execution, transport, handle errors)
118/// Get the semantic JSON-RPC error code for a PlexusError.
119fn plexus_error_code(e: &PlexusError) -> i32 {
120    match e {
121        PlexusError::Unauthenticated(_) => -32001,
122        PlexusError::InvalidParams(_) => -32602,
123        PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
124        _ => -32000,
125    }
126}
127
128/// Convert PlexusError to a JSON-RPC ErrorObject with semantic error codes.
129fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
130    jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
131}
132
133// ============================================================================
134// Schema Types
135// ============================================================================
136
137#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
138pub struct ActivationInfo {
139    pub namespace: String,
140    pub version: String,
141    pub description: String,
142    pub methods: Vec<String>,
143}
144
145// ============================================================================
146// Activation Trait
147// ============================================================================
148
149#[async_trait]
150pub trait Activation: Send + Sync + 'static {
151    type Methods: MethodEnumSchema;
152
153    fn namespace(&self) -> &str;
154    fn version(&self) -> &str;
155    /// Short description (max 15 words)
156    fn description(&self) -> &str { "No description available" }
157    /// Long description (optional, for detailed documentation)
158    fn long_description(&self) -> Option<&str> { None }
159    fn methods(&self) -> Vec<&str>;
160    fn method_help(&self, _method: &str) -> Option<String> { None }
161    /// Stable activation instance ID for handle routing
162    /// By default generates a deterministic UUID from namespace+major_version
163    /// Using major version only ensures handles survive minor/patch upgrades (semver)
164    fn plugin_id(&self) -> uuid::Uuid {
165        let major_version = self.version().split('.').next().unwrap_or("0");
166        uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
167    }
168
169    async fn call(
170        &self,
171        method: &str,
172        params: Value,
173        auth: Option<&super::auth::AuthContext>,
174        raw_ctx: Option<&crate::request::RawRequestContext>,
175    ) -> Result<PlexusStream, PlexusError>;
176    async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
177        Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
178    }
179
180    fn into_rpc_methods(self) -> Methods where Self: Sized;
181
182    /// Return this activation's schema (methods + optional children)
183    fn plugin_schema(&self) -> PluginSchema {
184        use std::collections::hash_map::DefaultHasher;
185        use std::hash::{Hash, Hasher};
186
187        let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
188            let desc = self.method_help(name).unwrap_or_default();
189            // Compute a simple hash for methods not using hub-macro
190            let mut hasher = DefaultHasher::new();
191            name.hash(&mut hasher);
192            desc.hash(&mut hasher);
193            let hash = format!("{:016x}", hasher.finish());
194            MethodSchema::new(name.to_string(), desc, hash)
195        }).collect();
196
197        if let Some(long_desc) = self.long_description() {
198            PluginSchema::leaf_with_long_description(
199                self.namespace(),
200                self.version(),
201                self.description(),
202                long_desc,
203                methods,
204            )
205        } else {
206            PluginSchema::leaf(
207                self.namespace(),
208                self.version(),
209                self.description(),
210                methods,
211            )
212        }
213    }
214}
215
216// ============================================================================
217// Child Routing for Hub Plugins
218// ============================================================================
219
220bitflags! {
221    /// Opt-in capability flags advertising which optional `ChildRouter`
222    /// operations a router supports.
223    ///
224    /// The Plexus RPC network is a *graph*, not a tree: children may be
225    /// remote, infinite, or deliberately private. Listing and searching
226    /// children are therefore opt-in — routers must declare them here
227    /// before callers can rely on them.
228    ///
229    /// # Contract
230    ///
231    /// | Condition | Expected |
232    /// |---|---|
233    /// | `capabilities().contains(LIST)` is `true` | `list_children().await` returns `Some(stream)` |
234    /// | `capabilities().contains(LIST)` is `false` | `list_children().await` returns `None` |
235    /// | `capabilities().contains(SEARCH)` is `true` | `search_children(q).await` returns `Some(stream)` for every `q` |
236    /// | `capabilities().contains(SEARCH)` is `false` | `search_children(q).await` returns `None` for every `q` |
237    ///
238    /// These rules are not runtime-enforced; advertising a capability you
239    /// do not implement is a correctness bug in the router.
240    ///
241    /// # Deprecated (IR-4)
242    ///
243    /// This bitflags type is superseded by the `MethodRole::DynamicChild {
244    /// list_method, search_method }` tag on the corresponding gate method.
245    /// Consumers that want to know whether a child router supports list /
246    /// search operations should inspect the gate method's role instead of
247    /// calling `ChildRouter::capabilities()`. The type stays on the wire for
248    /// the 0.5 transition window and is slated for removal in 0.7.
249    #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
250    #[deprecated(
251        since = "0.5",
252        note = "Use MethodRole::DynamicChild { list_method, search_method } instead. Removed in 0.7."
253    )]
254    pub struct ChildCapabilities: u32 {
255        /// The router promises `list_children()` returns `Some(stream)`.
256        const LIST = 0b0000_0001;
257        /// The router promises `search_children(query)` returns
258        /// `Some(stream)` for any query.
259        const SEARCH = 0b0000_0010;
260    }
261}
262
263/// Trait for activations that can route to child activations
264///
265/// Hub activations implement this to support nested method routing.
266/// When a method like "mercury.info" is called on a solar activation,
267/// this trait enables routing to the mercury child.
268///
269/// This trait is separate from Activation to avoid associated type issues
270/// with dynamic dispatch.
271///
272/// # Optional capabilities
273///
274/// In addition to the required `router_namespace` + `get_child` surface,
275/// routers may opt in to advertising enumerable and searchable children
276/// via [`ChildCapabilities`]. When a flag is set, the corresponding
277/// `list_children` / `search_children` method must return `Some(stream)`.
278/// The default implementations report no capabilities and return `None`.
279#[async_trait]
280pub trait ChildRouter: Send + Sync {
281    /// Get the namespace of this router (for error messages)
282    fn router_namespace(&self) -> &str;
283
284    /// Call a method on this router
285    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
286
287    /// Get a child activation instance by name for nested routing
288    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
289
290    /// Which optional operations (list / search) this router supports.
291    ///
292    /// Defaults to [`ChildCapabilities::empty()`]: a router that only
293    /// exposes `get_child` for exact-name lookup.
294    #[allow(deprecated)]
295    fn capabilities(&self) -> ChildCapabilities {
296        ChildCapabilities::empty()
297    }
298
299    /// Stream every child name the router is willing to enumerate.
300    ///
301    /// Returns `None` when the router does not support listing — callers
302    /// should check [`ChildRouter::capabilities`] first.
303    ///
304    /// Routers that implement this **must** set
305    /// [`ChildCapabilities::LIST`] in [`ChildRouter::capabilities`].
306    async fn list_children(&self) -> Option<BoxStream<'_, String>> {
307        None
308    }
309
310    /// Stream child names matching the router-defined query semantics.
311    ///
312    /// Returns `None` when the router does not support searching — callers
313    /// should check [`ChildRouter::capabilities`] first.
314    ///
315    /// Routers that implement this **must** set
316    /// [`ChildCapabilities::SEARCH`] in [`ChildRouter::capabilities`].
317    async fn search_children(&self, _query: &str) -> Option<BoxStream<'_, String>> {
318        None
319    }
320}
321
322/// Route a method call to a child activation
323///
324/// This is called from generated code when a hub activation receives
325/// a method that doesn't match its local methods. If the method
326/// contains a dot (e.g., "mercury.info"), it routes to the child.
327pub async fn route_to_child<T: ChildRouter + ?Sized>(
328    parent: &T,
329    method: &str,
330    params: Value,
331    auth: Option<&super::auth::AuthContext>,
332    raw_ctx: Option<&crate::request::RawRequestContext>,
333) -> Result<PlexusStream, PlexusError> {
334    // Try to split on first dot for nested routing
335    if let Some((child_name, rest)) = method.split_once('.') {
336        if let Some(child) = parent.get_child(child_name).await {
337            return child.router_call(rest, params, auth, raw_ctx).await;
338        }
339        return Err(PlexusError::ActivationNotFound(child_name.to_string()));
340    }
341
342    // No dot - method simply not found
343    Err(PlexusError::MethodNotFound {
344        activation: parent.router_namespace().to_string(),
345        method: method.to_string(),
346    })
347}
348
349/// Wrapper to implement ChildRouter for Arc<dyn ChildRouter>
350///
351/// This allows DynamicHub to return its stored Arc<dyn ChildRouter> from get_child()
352struct ArcChildRouter(Arc<dyn ChildRouter>);
353
354#[async_trait]
355impl ChildRouter for ArcChildRouter {
356    fn router_namespace(&self) -> &str {
357        self.0.router_namespace()
358    }
359
360    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
361        self.0.router_call(method, params, auth, raw_ctx).await
362    }
363
364    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
365        self.0.get_child(name).await
366    }
367
368    #[allow(deprecated)]
369    fn capabilities(&self) -> ChildCapabilities {
370        self.0.capabilities()
371    }
372
373    async fn list_children(&self) -> Option<BoxStream<'_, String>> {
374        self.0.list_children().await
375    }
376
377    async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
378        self.0.search_children(query).await
379    }
380}
381
382// ============================================================================
383// Internal Type-Erased Activation
384// ============================================================================
385
386#[async_trait]
387#[allow(dead_code)] // Methods exist for completeness but some aren't called post-erasure yet
388trait ActivationObject: Send + Sync + 'static {
389    fn namespace(&self) -> &str;
390    fn version(&self) -> &str;
391    fn description(&self) -> &str;
392    fn long_description(&self) -> Option<&str>;
393    fn methods(&self) -> Vec<&str>;
394    fn method_help(&self, method: &str) -> Option<String>;
395    fn plugin_id(&self) -> uuid::Uuid;
396    async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
397    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
398    fn plugin_schema(&self) -> PluginSchema;
399    fn schema(&self) -> Schema;
400}
401
402struct ActivationWrapper<A: Activation> {
403    inner: A,
404}
405
406#[async_trait]
407impl<A: Activation> ActivationObject for ActivationWrapper<A> {
408    fn namespace(&self) -> &str { self.inner.namespace() }
409    fn version(&self) -> &str { self.inner.version() }
410    fn description(&self) -> &str { self.inner.description() }
411    fn long_description(&self) -> Option<&str> { self.inner.long_description() }
412    fn methods(&self) -> Vec<&str> { self.inner.methods() }
413    fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
414    fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
415
416    async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
417        self.inner.call(method, params, auth, raw_ctx).await
418    }
419
420    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
421        self.inner.resolve_handle(handle).await
422    }
423
424    fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
425
426    fn schema(&self) -> Schema {
427        let schema = schemars::schema_for!(A::Methods);
428        serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
429            .expect("parse schema")
430    }
431}
432
433// ============================================================================
434// Plexus Event Types
435// ============================================================================
436
437#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
438#[serde(tag = "event", rename_all = "snake_case")]
439pub enum HashEvent {
440    Hash { value: String },
441}
442
443/// Event for schema() RPC method - returns plugin schema
444#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
445#[serde(tag = "event", rename_all = "snake_case")]
446pub enum SchemaEvent {
447    /// This plugin's schema
448    Schema(PluginSchema),
449}
450
451/// Lightweight hash information for cache validation
452#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
453pub struct PluginHashes {
454    pub namespace: String,
455    pub self_hash: String,
456    #[serde(skip_serializing_if = "Option::is_none")]
457    pub children_hash: Option<String>,
458    pub hash: String,
459    /// Child plugin hashes (for recursive checking)
460    #[serde(skip_serializing_if = "Option::is_none")]
461    pub children: Option<Vec<ChildHashes>>,
462}
463
464/// Hash information for a child plugin
465#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
466pub struct ChildHashes {
467    pub namespace: String,
468    pub hash: String,
469}
470
471
472// ============================================================================
473// Activation Registry
474// ============================================================================
475
476/// Entry in the activation registry
477#[derive(Debug, Clone)]
478pub struct PluginEntry {
479    /// Stable activation instance ID
480    pub id: uuid::Uuid,
481    /// Current path/namespace for this activation
482    pub path: String,
483    /// Activation type (e.g., "cone", "bash", "arbor")
484    pub plugin_type: String,
485}
486
487/// Registry mapping activation UUIDs to their current paths
488///
489/// This enables handle routing without path dependency - handles reference
490/// activations by their stable UUID, and the registry maps to the current path.
491#[derive(Default)]
492pub struct PluginRegistry {
493    /// Lookup by plugin UUID
494    by_id: HashMap<uuid::Uuid, PluginEntry>,
495    /// Lookup by current path (for reverse lookup)
496    by_path: HashMap<String, uuid::Uuid>,
497}
498
499/// Read-only snapshot of the activation registry
500///
501/// Safe to use outside of DynamicHub locks.
502#[derive(Clone)]
503pub struct PluginRegistrySnapshot {
504    by_id: HashMap<uuid::Uuid, PluginEntry>,
505    by_path: HashMap<String, uuid::Uuid>,
506}
507
508impl PluginRegistrySnapshot {
509    /// Look up an activation's path by its UUID
510    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
511        self.by_id.get(&id).map(|e| e.path.as_str())
512    }
513
514    /// Look up an activation's UUID by its path
515    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
516        self.by_path.get(path).copied()
517    }
518
519    /// Get an activation entry by its UUID
520    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
521        self.by_id.get(&id)
522    }
523
524    /// List all registered activations
525    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
526        self.by_id.values()
527    }
528
529    /// Get the number of registered plugins
530    pub fn len(&self) -> usize {
531        self.by_id.len()
532    }
533
534    /// Check if the registry is empty
535    pub fn is_empty(&self) -> bool {
536        self.by_id.is_empty()
537    }
538}
539
540impl PluginRegistry {
541    /// Create a new empty registry
542    pub fn new() -> Self {
543        Self::default()
544    }
545
546    /// Look up an activation's path by its UUID
547    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
548        self.by_id.get(&id).map(|e| e.path.as_str())
549    }
550
551    /// Look up an activation's UUID by its path
552    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
553        self.by_path.get(path).copied()
554    }
555
556    /// Get an activation entry by its UUID
557    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
558        self.by_id.get(&id)
559    }
560
561    /// Register an activation
562    pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
563        let entry = PluginEntry { id, path: path.clone(), plugin_type };
564        self.by_id.insert(id, entry);
565        self.by_path.insert(path, id);
566    }
567
568    /// List all registered activations
569    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
570        self.by_id.values()
571    }
572
573    /// Get the number of registered plugins
574    pub fn len(&self) -> usize {
575        self.by_id.len()
576    }
577
578    /// Check if the registry is empty
579    pub fn is_empty(&self) -> bool {
580        self.by_id.is_empty()
581    }
582}
583
584// ============================================================================
585// DynamicHub (formerly Plexus)
586// ============================================================================
587
588struct DynamicHubInner {
589    /// Custom namespace for this hub instance (defaults to "plexus")
590    namespace: String,
591    activations: HashMap<String, Arc<dyn ActivationObject>>,
592    /// Child routers for direct nested routing (e.g., hub.solar.mercury.info)
593    child_routers: HashMap<String, Arc<dyn ChildRouter>>,
594    /// Activation registry mapping UUIDs to paths
595    registry: std::sync::RwLock<PluginRegistry>,
596    pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
597}
598
599/// DynamicHub - an activation that routes to dynamically registered child activations
600///
601/// Unlike hub activations with hardcoded children (like Solar),
602/// DynamicHub allows registering activations at runtime via `.register()`.
603///
604/// # Direct Hosting
605///
606/// For a single activation, host it directly:
607/// ```ignore
608/// let solar = Arc::new(Solar::new());
609/// TransportServer::builder(solar, converter).serve().await?;
610/// ```
611///
612/// # Composition
613///
614/// For multiple top-level activations, use DynamicHub:
615/// ```ignore
616/// let hub = DynamicHub::with_namespace("myapp")
617///     .register(Solar::new())
618///     .register(Echo::new());
619/// ```
620#[derive(Clone)]
621pub struct DynamicHub {
622    inner: Arc<DynamicHubInner>,
623}
624
625// ============================================================================
626// DynamicHub Infrastructure (non-RPC methods)
627// ============================================================================
628
629impl DynamicHub {
630    /// Create a new DynamicHub with explicit namespace
631    ///
632    /// Unlike single activations which have fixed namespaces, DynamicHub is a
633    /// composition tool that can be named based on your application. Common choices:
634    /// - "hub" - generic default
635    /// - "substrate" - for substrate server
636    /// - "myapp" - for your application name
637    ///
638    /// The namespace appears in method calls: `{namespace}.call`, `{namespace}.schema`
639    pub fn new(namespace: impl Into<String>) -> Self {
640        Self {
641            inner: Arc::new(DynamicHubInner {
642                namespace: namespace.into(),
643                activations: HashMap::new(),
644                child_routers: HashMap::new(),
645                registry: std::sync::RwLock::new(PluginRegistry::new()),
646                pending_rpc: std::sync::Mutex::new(Vec::new()),
647            }),
648        }
649    }
650
651    /// Deprecated: Use new() with explicit namespace instead
652    #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
653    pub fn with_namespace(namespace: impl Into<String>) -> Self {
654        Self::new(namespace)
655    }
656
657    /// Get the runtime namespace for this DynamicHub instance
658    pub fn runtime_namespace(&self) -> &str {
659        &self.inner.namespace
660    }
661
662    /// Get access to the activation registry
663    pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
664        self.inner.registry.read().unwrap()
665    }
666
667    /// Register an activation
668    pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
669        let namespace = activation.namespace().to_string();
670        let plugin_id = activation.plugin_id();
671        let activation_for_rpc = activation.clone();
672        let activation_for_router = activation.clone();
673
674        let inner = Arc::get_mut(&mut self.inner)
675            .expect("Cannot register: DynamicHub has multiple references");
676
677        // Register in the activation registry
678        inner.registry.write().unwrap().register(
679            plugin_id,
680            namespace.clone(),
681            namespace.clone(), // Use namespace as plugin_type for now
682        );
683
684        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
685        inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
686        inner.pending_rpc.lock().unwrap()
687            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
688        self
689    }
690
691    /// Register a hub activation that supports nested routing
692    ///
693    /// Hub activations implement `ChildRouter`, enabling direct nested method calls
694    /// like `hub.solar.mercury.info` at the RPC layer (no hub.call indirection).
695    #[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
696    pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
697        let namespace = activation.namespace().to_string();
698        let plugin_id = activation.plugin_id();
699        let activation_for_rpc = activation.clone();
700        let activation_for_router = activation.clone();
701
702        let inner = Arc::get_mut(&mut self.inner)
703            .expect("Cannot register: DynamicHub has multiple references");
704
705        // Register in the activation registry
706        inner.registry.write().unwrap().register(
707            plugin_id,
708            namespace.clone(),
709            namespace.clone(), // Use namespace as plugin_type for now
710        );
711
712        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
713        inner.child_routers.insert(namespace, Arc::new(activation_for_router));
714        inner.pending_rpc.lock().unwrap()
715            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
716        self
717    }
718
719    /// List all methods across all activations
720    pub fn list_methods(&self) -> Vec<String> {
721        let mut methods = Vec::new();
722
723        // Include hub's own methods
724        for m in Activation::methods(self) {
725            methods.push(format!("{}.{}", self.inner.namespace, m));
726        }
727
728        // Include registered activation methods
729        for (ns, act) in &self.inner.activations {
730            for m in act.methods() {
731                methods.push(format!("{}.{}", ns, m));
732            }
733        }
734        methods.sort();
735        methods
736    }
737
738    /// List all activations (including this hub itself)
739    pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
740        let mut activations = Vec::new();
741
742        // Include this hub itself
743        activations.push(ActivationInfo {
744            namespace: Activation::namespace(self).to_string(),
745            version: Activation::version(self).to_string(),
746            description: Activation::description(self).to_string(),
747            methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
748        });
749
750        // Include registered activations
751        for a in self.inner.activations.values() {
752            activations.push(ActivationInfo {
753                namespace: a.namespace().to_string(),
754                version: a.version().to_string(),
755                description: a.description().to_string(),
756                methods: a.methods().iter().map(|s| s.to_string()).collect(),
757            });
758        }
759
760        activations
761    }
762
763    /// Compute hash for cache invalidation
764    ///
765    /// Returns the hash from the recursive plugin schema. This hash changes
766    /// whenever any method definition or child plugin changes.
767    pub fn compute_hash(&self) -> String {
768        Activation::plugin_schema(self).hash
769    }
770
771    /// Route a call to the appropriate activation
772    pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
773        self.route_with_ctx(method, params, auth, None).await
774    }
775
776    /// Route a call to the appropriate activation, with optional raw HTTP request context.
777    pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
778        let (namespace, method_name) = self.parse_method(method)?;
779
780        // Handle plexus's own methods
781        if namespace == self.inner.namespace {
782            return Activation::call(self, method_name, params, auth, raw_ctx).await;
783        }
784
785        let activation = self.inner.activations.get(namespace)
786            .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
787
788        activation.call(method_name, params, auth, raw_ctx).await
789    }
790
791    /// Resolve a handle using the activation registry
792    ///
793    /// Looks up the activation by its UUID in the registry.
794    pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
795        let path = self.inner.registry.read().unwrap()
796            .lookup(handle.plugin_id)
797            .map(|s| s.to_string())
798            .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
799
800        let activation = self.inner.activations.get(&path)
801            .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
802        activation.resolve_handle(handle).await
803    }
804
805    /// Get activation schema
806    pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
807        self.inner.activations.get(namespace).map(|a| a.schema())
808    }
809
810    /// Get a snapshot of the activation registry (safe to use outside locks)
811    pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
812        let guard = self.inner.registry.read().unwrap();
813        PluginRegistrySnapshot {
814            by_id: guard.by_id.clone(),
815            by_path: guard.by_path.clone(),
816        }
817    }
818
819    /// Look up an activation path by its UUID
820    pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
821        self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
822    }
823
824    /// Look up an activation UUID by its path
825    pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
826        self.inner.registry.read().unwrap().lookup_by_path(path)
827    }
828
829    /// Get activation schemas for all activations (including this hub itself)
830    pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
831        let mut schemas = Vec::new();
832
833        // Include this hub itself
834        schemas.push(Activation::plugin_schema(self));
835
836        // Include registered activations
837        for a in self.inner.activations.values() {
838            schemas.push(a.plugin_schema());
839        }
840
841        schemas
842    }
843
844    /// Deprecated: use list_plugin_schemas instead
845    #[deprecated(note = "Use list_plugin_schemas instead")]
846    pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
847        self.list_plugin_schemas()
848    }
849
850    /// Get help for a method
851    pub fn get_method_help(&self, method: &str) -> Option<String> {
852        let (namespace, method_name) = self.parse_method(method).ok()?;
853        let activation = self.inner.activations.get(namespace)?;
854        activation.method_help(method_name)
855    }
856
857    fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
858        let parts: Vec<&str> = method.splitn(2, '.').collect();
859        if parts.len() != 2 {
860            return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
861        }
862        Ok((parts[0], parts[1]))
863    }
864
865    /// Get child activation summaries (for hub functionality)
866    /// Called by hub-macro when `hub` flag is set
867    pub fn plugin_children(&self) -> Vec<ChildSummary> {
868        self.inner.activations.values()
869            .map(|a| {
870                let schema = a.plugin_schema();
871                ChildSummary {
872                    namespace: schema.namespace,
873                    description: schema.description,
874                    hash: schema.hash,
875                }
876            })
877            .collect()
878    }
879
880    /// Convert to RPC module
881    pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
882        let mut module = RpcModule::new(());
883
884        PlexusContext::init(self.compute_hash());
885
886        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
887        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
888        let ns = self.runtime_namespace();
889        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
890        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
891        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
892        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
893        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
894        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
895        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
896        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
897        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
898
899        // Register {ns}.call subscription
900        let plexus_for_call = self.clone();
901        module.register_subscription(
902            call_method,
903            PLEXUS_NOTIF_METHOD,
904            call_unsub,
905            move |params, pending, _ctx, _ext| {
906                let plexus = plexus_for_call.clone();
907                Box::pin(async move {
908                    let p: CallParams = params.parse()?;
909                    match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
910                        Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
911                        Err(e) => {
912                            let sink = pending.accept().await?;
913                            let error_item = super::types::PlexusStreamItem::Error {
914                                metadata: super::types::StreamMetadata::new(
915                                    vec![ns_static.into()],
916                                    PlexusContext::hash(),
917                                ),
918                                message: e.to_string(),
919                                code: Some(plexus_error_code(&e).to_string()),
920                                recoverable: false,
921                            };
922                            if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
923                                let _ = sink.send(raw).await;
924                            }
925                            Ok(())
926                        }
927                    }
928                })
929            }
930        )?;
931
932        // Register {ns}.hash subscription
933        let plexus_for_hash = self.clone();
934        module.register_subscription(
935            hash_method,
936            PLEXUS_NOTIF_METHOD,
937            hash_unsub,
938            move |_params, pending, _ctx, _ext| {
939                let plexus = plexus_for_hash.clone();
940                Box::pin(async move {
941                    let schema = Activation::plugin_schema(&plexus);
942                    let stream = async_stream::stream! {
943                        yield HashEvent::Hash { value: schema.hash };
944                    };
945                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
946                    pipe_stream_to_subscription(pending, wrapped).await
947                })
948            }
949        )?;
950
951        // Register {ns}.schema subscription
952        let plexus_for_schema = self.clone();
953        module.register_subscription(
954            schema_method,
955            PLEXUS_NOTIF_METHOD,
956            schema_unsub,
957            move |params, pending, _ctx, _ext| {
958                let plexus = plexus_for_schema.clone();
959                Box::pin(async move {
960                    let p: SchemaParams = params.parse().unwrap_or_default();
961                    let plugin_schema = Activation::plugin_schema(&plexus);
962
963                    let result = if let Some(ref name) = p.method {
964                        plugin_schema.methods.iter()
965                            .find(|m| m.name == *name)
966                            .map(|m| super::SchemaResult::Method(m.clone()))
967                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
968                                -32602,
969                                format!("Method '{}' not found", name),
970                                None::<()>,
971                            ))?
972                    } else {
973                        super::SchemaResult::Plugin(plugin_schema)
974                    };
975
976                    let stream = async_stream::stream! { yield result; };
977                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
978                    pipe_stream_to_subscription(pending, wrapped).await
979                })
980            }
981        )?;
982
983        // Register _info well-known endpoint (no namespace prefix)
984        // Returns backend name as a single-item stream with automatic Done event
985        let backend_name = self.runtime_namespace().to_string();
986        module.register_subscription(
987            "_info",
988            PLEXUS_NOTIF_METHOD,
989            "_info_unsub",
990            move |_params, pending, _ctx, _ext| {
991                let name = backend_name.clone();
992                Box::pin(async move {
993                    // Create a single-item stream with the info response
994                    let info_stream = futures::stream::once(async move {
995                        serde_json::json!({"backend": name})
996                    });
997
998                    // Wrap to auto-append Done event
999                    let wrapped = super::streaming::wrap_stream(
1000                        info_stream,
1001                        "_info",
1002                        vec![]
1003                    );
1004
1005                    // Pipe to subscription (handles Done automatically)
1006                    pipe_stream_to_subscription(pending, wrapped).await
1007                })
1008            }
1009        )?;
1010
1011        // Add all registered activation RPC methods
1012        let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
1013        for factory in pending {
1014            module.merge(factory())?;
1015        }
1016
1017        Ok(module)
1018    }
1019
1020    /// Convert Arc<DynamicHub> to RPC module while keeping the Arc alive
1021    ///
1022    /// Unlike `into_rpc_module`, this keeps the Arc<DynamicHub> reference alive,
1023    /// which is necessary when activations hold Weak<DynamicHub> references that
1024    /// need to remain upgradeable.
1025    pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
1026        let mut module = RpcModule::new(());
1027
1028        PlexusContext::init(hub.compute_hash());
1029
1030        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
1031        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
1032        let ns = hub.runtime_namespace();
1033        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
1034        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
1035        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1036        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
1037        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1038        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
1039        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1040        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1041        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
1042
1043        // Register {ns}.call subscription - clone Arc to keep reference alive
1044        let hub_for_call = hub.clone();
1045        module.register_subscription(
1046            call_method,
1047            call_method,
1048            call_unsub,
1049            move |params, pending, _ctx, ext| {
1050                let hub = hub_for_call.clone();
1051                Box::pin(async move {
1052                    let p: CallParams = params.parse()?;
1053                    // Extract auth context from Extensions (if present)
1054                    let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
1055                        .map(|arc| arc.as_ref());
1056                    match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
1057                        Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
1058                        Err(e) => {
1059                            // Accept the subscription, then send the error as a stream item.
1060                            // This preserves the error message and code — returning Err(...)
1061                            // from a subscription handler causes jsonrpsee to wrap it as
1062                            // generic -32603, discarding our semantic error code.
1063                            let sink = pending.accept().await?;
1064                            let error_item = super::types::PlexusStreamItem::Error {
1065                                metadata: super::types::StreamMetadata::new(
1066                                    vec![ns_static.into()],
1067                                    PlexusContext::hash(),
1068                                ),
1069                                message: e.to_string(),
1070                                code: Some(plexus_error_code(&e).to_string()),
1071                                recoverable: false,
1072                            };
1073                            if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
1074                                let _ = sink.send(raw).await;
1075                            }
1076                            Ok(())
1077                        }
1078                    }
1079                })
1080            }
1081        )?;
1082
1083        // Register {ns}.hash subscription
1084        let hub_for_hash = hub.clone();
1085        module.register_subscription(
1086            hash_method,
1087            PLEXUS_NOTIF_METHOD,
1088            hash_unsub,
1089            move |_params, pending, _ctx, _ext| {
1090                let hub = hub_for_hash.clone();
1091                Box::pin(async move {
1092                    let schema = Activation::plugin_schema(&*hub);
1093                    let stream = async_stream::stream! {
1094                        yield HashEvent::Hash { value: schema.hash };
1095                    };
1096                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1097                    pipe_stream_to_subscription(pending, wrapped).await
1098                })
1099            }
1100        )?;
1101
1102        // Register {ns}.schema subscription
1103        let hub_for_schema = hub.clone();
1104        module.register_subscription(
1105            schema_method,
1106            PLEXUS_NOTIF_METHOD,
1107            schema_unsub,
1108            move |params, pending, _ctx, _ext| {
1109                let hub = hub_for_schema.clone();
1110                Box::pin(async move {
1111                    let p: SchemaParams = params.parse().unwrap_or_default();
1112                    let plugin_schema = Activation::plugin_schema(&*hub);
1113
1114                    let result = if let Some(ref name) = p.method {
1115                        plugin_schema.methods.iter()
1116                            .find(|m| m.name == *name)
1117                            .map(|m| super::SchemaResult::Method(m.clone()))
1118                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1119                                -32602,
1120                                format!("Method '{}' not found", name),
1121                                None::<()>,
1122                            ))?
1123                    } else {
1124                        super::SchemaResult::Plugin(plugin_schema)
1125                    };
1126
1127                    let stream = async_stream::stream! {
1128                        yield result;
1129                    };
1130                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1131                    pipe_stream_to_subscription(pending, wrapped).await
1132                })
1133            }
1134        )?;
1135
1136        // Register _info well-known endpoint (no namespace prefix)
1137        // Returns backend name as a single-item stream with automatic Done event
1138        let backend_name = hub.runtime_namespace().to_string();
1139        module.register_subscription(
1140            "_info",
1141            PLEXUS_NOTIF_METHOD,
1142            "_info_unsub",
1143            move |_params, pending, _ctx, _ext| {
1144                let name = backend_name.clone();
1145                Box::pin(async move {
1146                    // Create a single-item stream with the info response
1147                    let info_stream = futures::stream::once(async move {
1148                        serde_json::json!({"backend": name})
1149                    });
1150
1151                    // Wrap to auto-append Done event
1152                    let wrapped = super::streaming::wrap_stream(
1153                        info_stream,
1154                        "_info",
1155                        vec![]
1156                    );
1157
1158                    // Pipe to subscription (handles Done automatically)
1159                    pipe_stream_to_subscription(pending, wrapped).await
1160                })
1161            }
1162        )?;
1163
1164        // Register {ns}.respond method for WebSocket bidirectional responses
1165        // This allows clients to respond to server-initiated requests (like confirmations/prompts)
1166        let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
1167        module.register_async_method(respond_method, |params, _ctx, _ext| async move {
1168            use super::bidirectional::{handle_pending_response, BidirError};
1169
1170            let p: RespondParams = params.parse()?;
1171
1172            tracing::debug!(
1173                request_id = %p.request_id,
1174                "Handling {}.respond via WebSocket",
1175                "plexus"
1176            );
1177
1178            match handle_pending_response(&p.request_id, p.response_data) {
1179                Ok(()) => Ok(serde_json::json!({"success": true})),
1180                Err(BidirError::UnknownRequest) => {
1181                    tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1182                    Err(jsonrpsee::types::ErrorObject::owned(
1183                        -32602,
1184                        format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1185                        None::<()>,
1186                    ))
1187                }
1188                Err(BidirError::ChannelClosed) => {
1189                    tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1190                    Err(jsonrpsee::types::ErrorObject::owned(
1191                        -32000,
1192                        "Response channel was closed (request may have timed out)",
1193                        None::<()>,
1194                    ))
1195                }
1196                Err(e) => {
1197                    tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1198                    Err(jsonrpsee::types::ErrorObject::owned(
1199                        -32000,
1200                        format!("Failed to deliver response: {}", e),
1201                        None::<()>,
1202                    ))
1203                }
1204            }
1205        })?;
1206
1207        // Register pending RPC methods from activations
1208        let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1209        tracing::trace!(factories = pending.len(), "merging activation RPC factories");
1210        for (idx, factory) in pending.into_iter().enumerate() {
1211            tracing::trace!(factory_idx = idx, "calling factory to get Methods");
1212            let methods = factory();
1213            let method_count = methods.method_names().count();
1214            tracing::trace!(factory_idx = idx, methods = method_count, "factory returned Methods; merging into module");
1215            module.merge(methods)?;
1216            tracing::trace!(factory_idx = idx, "successfully merged factory methods");
1217        }
1218        tracing::trace!("all activations merged successfully");
1219
1220        Ok(module)
1221    }
1222}
1223
1224/// Params for {ns}.call
1225#[derive(Debug, serde::Deserialize)]
1226struct CallParams {
1227    method: String,
1228    #[serde(default)]
1229    params: Option<Value>,
1230}
1231
1232/// Params for {ns}.schema
1233#[derive(Debug, Default, serde::Deserialize)]
1234struct SchemaParams {
1235    method: Option<String>,
1236}
1237
1238/// Params for {ns}.respond (WebSocket bidirectional response)
1239#[derive(Debug, serde::Deserialize)]
1240struct RespondParams {
1241    request_id: String,
1242    response_data: Value,
1243}
1244
1245/// Helper to pipe a PlexusStream to a subscription sink.
1246///
1247/// Notifications are sent with `method: PLEXUS_NOTIF_METHOD` on the wire,
1248/// as set by the `notif_method_name` arg in each `register_subscription` call.
1249async fn pipe_stream_to_subscription(
1250    pending: jsonrpsee::PendingSubscriptionSink,
1251    mut stream: PlexusStream,
1252) -> jsonrpsee::core::SubscriptionResult {
1253    use futures::StreamExt;
1254
1255    let sink = pending.accept().await?;
1256    while let Some(item) = stream.next().await {
1257        let json = serde_json::value::to_raw_value(&item)?;
1258        sink.send(json).await?;
1259    }
1260    Ok(())
1261}
1262
1263// ============================================================================
1264// DynamicHub RPC Methods (via plexus-macros)
1265// ============================================================================
1266
1267#[plexus_macros::activation(
1268    namespace = "plexus",
1269    version = "1.0.0",
1270    description = "Central routing and introspection",
1271    hub,
1272    namespace_fn = "runtime_namespace"
1273)]
1274#[allow(deprecated)]
1275impl DynamicHub {
1276    /// Route a call to a registered activation
1277    #[plexus_macros::method(
1278        streaming,
1279        description = "Route a call to a registered activation",
1280        params(
1281            method = "The method to call (format: namespace.method)",
1282            params = "Parameters to pass to the method (optional, defaults to {})"
1283        )
1284    )]
1285    async fn call(
1286        &self,
1287        method: String,
1288        params: Option<Value>,
1289    ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1290        use super::context::PlexusContext;
1291        use super::types::{PlexusStreamItem, StreamMetadata};
1292
1293        let result = self.route(&method, params.unwrap_or_default(), None).await;
1294
1295        match result {
1296            Ok(plexus_stream) => {
1297                // Forward the routed stream directly - it already contains PlexusStreamItems
1298                plexus_stream
1299            }
1300            Err(e) => {
1301                // Return error as a PlexusStreamItem stream
1302                let metadata = StreamMetadata::new(
1303                    vec![self.inner.namespace.clone()],
1304                    PlexusContext::hash(),
1305                );
1306                Box::pin(futures::stream::once(async move {
1307                    PlexusStreamItem::Error {
1308                        metadata,
1309                        message: e.to_string(),
1310                        code: None,
1311                        recoverable: false,
1312                    }
1313                }))
1314            }
1315        }
1316    }
1317
1318    /// Get Plexus RPC server configuration hash (from the recursive schema)
1319    ///
1320    /// This hash changes whenever any method or child activation changes.
1321    /// It's computed from the method hashes rolled up through the schema tree.
1322    #[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1323    async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1324        let schema = Activation::plugin_schema(self);
1325        stream! { yield HashEvent::Hash { value: schema.hash }; }
1326    }
1327
1328    /// Get plugin hashes for cache validation (lightweight alternative to full schema)
1329    #[plexus_macros::method(description = "Get plugin hashes for cache validation")]
1330    #[allow(deprecated)]
1331    async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1332        let schema = Activation::plugin_schema(self);
1333
1334        stream! {
1335            yield PluginHashes {
1336                namespace: schema.namespace.clone(),
1337                self_hash: schema.self_hash.clone(),
1338                children_hash: schema.children_hash.clone(),
1339                hash: schema.hash.clone(),
1340                children: schema.children.as_ref().map(|kids| {
1341                    kids.iter()
1342                        .map(|c| ChildHashes {
1343                            namespace: c.namespace.clone(),
1344                            hash: c.hash.clone(),
1345                        })
1346                        .collect()
1347                }),
1348            };
1349        }
1350    }
1351
1352    // Note: schema() method is auto-generated by hub-macro for all activations
1353}
1354
1355// ============================================================================
1356// HubContext Implementation for Weak<DynamicHub>
1357// ============================================================================
1358
1359use super::hub_context::HubContext;
1360use std::sync::Weak;
1361
1362/// HubContext implementation for Weak<DynamicHub>
1363///
1364/// This enables activations to receive a weak reference to their parent DynamicHub,
1365/// allowing them to resolve handles and route calls through the hub without
1366/// creating reference cycles.
1367#[async_trait]
1368impl HubContext for Weak<DynamicHub> {
1369    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1370        let hub = self.upgrade().ok_or_else(|| {
1371            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1372        })?;
1373        hub.do_resolve_handle(handle).await
1374    }
1375
1376    async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1377        let hub = self.upgrade().ok_or_else(|| {
1378            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1379        })?;
1380        hub.route(method, params, None).await
1381    }
1382
1383    fn is_valid(&self) -> bool {
1384        self.upgrade().is_some()
1385    }
1386}
1387
1388/// ChildRouter implementation for DynamicHub
1389///
1390/// This enables nested routing through registered activations.
1391/// e.g., hub.call("solar.mercury.info") routes to solar → mercury → info
1392#[async_trait]
1393impl ChildRouter for DynamicHub {
1394    fn router_namespace(&self) -> &str {
1395        &self.inner.namespace
1396    }
1397
1398    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1399        // DynamicHub routes via its registered activations
1400        // Method format: "activation.method" or "activation.child.method"
1401        self.route_with_ctx(method, params, auth, raw_ctx).await
1402    }
1403
1404    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1405        // Look up registered activations that implement ChildRouter
1406        self.inner.child_routers.get(name)
1407            .map(|router| {
1408                // Clone the Arc and wrap in Box for the trait object
1409                Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1410            })
1411    }
1412}
1413
1414#[cfg(test)]
1415#[allow(deprecated)]
1416mod tests {
1417    use super::*;
1418
1419    #[test]
1420    fn dynamic_hub_implements_activation() {
1421        fn assert_activation<T: Activation>() {}
1422        assert_activation::<DynamicHub>();
1423    }
1424
1425    #[test]
1426    fn dynamic_hub_methods() {
1427        let hub = DynamicHub::new("test");
1428        let methods = hub.methods();
1429        assert!(methods.contains(&"call"));
1430        assert!(methods.contains(&"hash"));
1431        assert!(methods.contains(&"schema"));
1432        // list_activations was removed - use schema() instead
1433    }
1434
1435    #[test]
1436    fn dynamic_hub_hash_stable() {
1437        let h1 = DynamicHub::new("test");
1438        let h2 = DynamicHub::new("test");
1439        assert_eq!(h1.compute_hash(), h2.compute_hash());
1440    }
1441
1442    #[test]
1443    fn dynamic_hub_is_hub() {
1444        use crate::activations::health::Health;
1445        let hub = DynamicHub::new("test").register(Health::new());
1446        let schema = hub.plugin_schema();
1447
1448        // DynamicHub should be a hub (has children)
1449        assert!(schema.is_hub(), "dynamic hub should be a hub");
1450        assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1451
1452        // Should have children (as summaries)
1453        let children = schema.children.expect("dynamic hub should have children");
1454        assert!(!children.is_empty(), "dynamic hub should have at least one child");
1455
1456        // Health should be in the children summaries
1457        let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1458        assert!(!health.hash.is_empty(), "health should have a hash");
1459    }
1460
1461    #[test]
1462    fn dynamic_hub_schema_structure() {
1463        use crate::activations::health::Health;
1464        let hub = DynamicHub::new("test").register(Health::new());
1465        let schema = hub.plugin_schema();
1466
1467        // Pretty print the schema
1468        let json = serde_json::to_string_pretty(&schema).unwrap();
1469        println!("DynamicHub schema:\n{}", json);
1470
1471        // Verify structure
1472        assert_eq!(schema.namespace, "test");
1473        assert!(schema.methods.iter().any(|m| m.name == "call"));
1474        assert!(schema.children.is_some());
1475    }
1476
1477    // ========================================================================
1478    // INVARIANT: Handle routing - resolves to correct plugin
1479    // ========================================================================
1480
1481    #[tokio::test]
1482    async fn invariant_resolve_handle_unknown_activation() {
1483        use crate::activations::health::Health;
1484        use crate::types::Handle;
1485        use uuid::Uuid;
1486
1487        let hub = DynamicHub::new("test").register(Health::new());
1488
1489        // Handle for an unregistered activation (random UUID)
1490        let unknown_plugin_id = Uuid::new_v4();
1491        let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1492
1493        let result = hub.do_resolve_handle(&handle).await;
1494
1495        match result {
1496            Err(PlexusError::ActivationNotFound(_)) => {
1497                // Expected - activation not found
1498            }
1499            Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1500            Ok(_) => panic!("Expected error for unknown activation"),
1501        }
1502    }
1503
1504    #[tokio::test]
1505    async fn invariant_resolve_handle_unsupported() {
1506        use crate::activations::health::Health;
1507        use crate::types::Handle;
1508
1509        let hub = DynamicHub::new("test").register(Health::new());
1510
1511        // Handle for health activation (which doesn't support handle resolution)
1512        let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1513
1514        let result = hub.do_resolve_handle(&handle).await;
1515
1516        match result {
1517            Err(PlexusError::HandleNotSupported(name)) => {
1518                assert_eq!(name, "health");
1519            }
1520            Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1521            Ok(_) => panic!("Expected error for unsupported handle"),
1522        }
1523    }
1524
1525    #[tokio::test]
1526    async fn invariant_resolve_handle_routes_by_plugin_id() {
1527        use crate::activations::health::Health;
1528        use crate::activations::echo::Echo;
1529        use crate::types::Handle;
1530        use uuid::Uuid;
1531
1532        let health = Health::new();
1533        let echo = Echo::new();
1534        let health_plugin_id = health.plugin_id();
1535        let echo_plugin_id = echo.plugin_id();
1536
1537        let hub = DynamicHub::new("test")
1538            .register(health)
1539            .register(echo);
1540
1541        // Health handle → health activation
1542        let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1543        match hub.do_resolve_handle(&health_handle).await {
1544            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1545            Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1546            Ok(_) => panic!("health handle should return HandleNotSupported"),
1547        }
1548
1549        // Echo handle → echo activation
1550        let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1551        match hub.do_resolve_handle(&echo_handle).await {
1552            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1553            Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1554            Ok(_) => panic!("echo handle should return HandleNotSupported"),
1555        }
1556
1557        // Unknown handle → ActivationNotFound (random UUID not registered)
1558        let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1559        match hub.do_resolve_handle(&unknown_handle).await {
1560            Err(PlexusError::ActivationNotFound(_)) => { /* expected */ },
1561            Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1562            Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1563        }
1564    }
1565
1566    #[test]
1567    fn invariant_handle_plugin_id_determines_routing() {
1568        use crate::activations::health::Health;
1569        use crate::activations::echo::Echo;
1570        use crate::types::Handle;
1571
1572        let health = Health::new();
1573        let echo = Echo::new();
1574
1575        // Same meta, different activations → different routing targets (by plugin_id)
1576        let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1577            .with_meta(vec!["msg-123".into(), "user".into()]);
1578        let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1579            .with_meta(vec!["msg-123".into(), "user".into()]);
1580
1581        // Different plugin_ids ensure different routing
1582        assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1583    }
1584
1585    // ========================================================================
1586    // Plugin Registry Tests
1587    // ========================================================================
1588
1589    #[test]
1590    fn plugin_registry_basic_operations() {
1591        let mut registry = PluginRegistry::new();
1592        let id = uuid::Uuid::new_v4();
1593
1594        // Register an activation
1595        registry.register(id, "test_plugin".to_string(), "test".to_string());
1596
1597        // Lookup by ID
1598        assert_eq!(registry.lookup(id), Some("test_plugin"));
1599
1600        // Lookup by path
1601        assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1602
1603        // Get entry
1604        let entry = registry.get(id).expect("should have entry");
1605        assert_eq!(entry.path, "test_plugin");
1606        assert_eq!(entry.plugin_type, "test");
1607    }
1608
1609    #[test]
1610    fn plugin_registry_populated_on_register() {
1611        use crate::activations::health::Health;
1612
1613        let hub = DynamicHub::new("test").register(Health::new());
1614
1615        let registry = hub.registry();
1616        assert!(!registry.is_empty(), "registry should not be empty after registration");
1617
1618        // Health activation should be registered
1619        let health_id = registry.lookup_by_path("health");
1620        assert!(health_id.is_some(), "health should be registered by path");
1621
1622        // Should be able to look up path by ID
1623        let health_uuid = health_id.unwrap();
1624        assert_eq!(registry.lookup(health_uuid), Some("health"));
1625    }
1626
1627    #[test]
1628    fn plugin_registry_deterministic_uuid() {
1629        use crate::activations::health::Health;
1630
1631        // Same activation registered twice should produce same UUID
1632        let health1 = Health::new();
1633        let health2 = Health::new();
1634
1635        assert_eq!(health1.plugin_id(), health2.plugin_id(),
1636            "same activation type should have deterministic UUID");
1637
1638        // UUID should be based on namespace+major_version (semver compatibility)
1639        let expected = uuid::Uuid::new_v5(
1640            &uuid::Uuid::NAMESPACE_OID,
1641            b"health@1"
1642        );
1643        assert_eq!(health1.plugin_id(), expected,
1644            "plugin_id should be deterministic from namespace@major_version");
1645    }
1646
1647    // ========================================================================
1648    // CHILD-2: ChildRouter capabilities + opt-in list/search
1649    // ========================================================================
1650
1651    /// A minimal `ChildRouter` that overrides only the required methods.
1652    /// Exercises default implementations of `capabilities`, `list_children`
1653    /// and `search_children`.
1654    struct MinimalRouter;
1655
1656    #[async_trait]
1657    impl ChildRouter for MinimalRouter {
1658        fn router_namespace(&self) -> &str {
1659            "minimal"
1660        }
1661
1662        async fn router_call(
1663            &self,
1664            _method: &str,
1665            _params: Value,
1666            _auth: Option<&super::super::auth::AuthContext>,
1667            _raw_ctx: Option<&crate::request::RawRequestContext>,
1668        ) -> Result<PlexusStream, PlexusError> {
1669            Err(PlexusError::MethodNotFound {
1670                activation: "minimal".into(),
1671                method: "none".into(),
1672            })
1673        }
1674
1675        async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
1676            None
1677        }
1678    }
1679
1680    #[tokio::test]
1681    async fn child_router_defaults_report_no_capabilities_and_none_streams() {
1682        let router = MinimalRouter;
1683
1684        assert_eq!(
1685            router.capabilities(),
1686            ChildCapabilities::empty(),
1687            "default capabilities should be empty"
1688        );
1689        assert!(
1690            router.list_children().await.is_none(),
1691            "default list_children should be None"
1692        );
1693        assert!(
1694            router.search_children("anything").await.is_none(),
1695            "default search_children should be None"
1696        );
1697    }
1698
1699    /// A `ChildRouter` that opts in to both LIST and SEARCH.
1700    struct ListingRouter {
1701        names: Vec<String>,
1702    }
1703
1704    #[async_trait]
1705    impl ChildRouter for ListingRouter {
1706        fn router_namespace(&self) -> &str {
1707            "listing"
1708        }
1709
1710        async fn router_call(
1711            &self,
1712            _method: &str,
1713            _params: Value,
1714            _auth: Option<&super::super::auth::AuthContext>,
1715            _raw_ctx: Option<&crate::request::RawRequestContext>,
1716        ) -> Result<PlexusStream, PlexusError> {
1717            Err(PlexusError::MethodNotFound {
1718                activation: "listing".into(),
1719                method: "none".into(),
1720            })
1721        }
1722
1723        async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1724            if self.names.iter().any(|n| n == name) {
1725                // Return the same type to keep the test simple; we only care
1726                // that the override compiles and is reachable.
1727                Some(Box::new(ListingRouter { names: vec![] }))
1728            } else {
1729                None
1730            }
1731        }
1732
1733        fn capabilities(&self) -> ChildCapabilities {
1734            ChildCapabilities::LIST | ChildCapabilities::SEARCH
1735        }
1736
1737        async fn list_children(&self) -> Option<BoxStream<'_, String>> {
1738            let stream = futures::stream::iter(self.names.iter().cloned());
1739            Some(Box::pin(stream))
1740        }
1741
1742        async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
1743            let q = query.to_string();
1744            let stream = futures::stream::iter(
1745                self.names
1746                    .iter()
1747                    .filter(move |n| n.contains(&q))
1748                    .cloned()
1749                    .collect::<Vec<_>>(),
1750            );
1751            Some(Box::pin(stream))
1752        }
1753    }
1754
1755    #[tokio::test]
1756    async fn child_router_overrides_report_capabilities_and_yield_streams() {
1757        use futures::StreamExt;
1758
1759        let router = ListingRouter {
1760            names: vec!["alpha".into(), "beta".into(), "alphabet".into()],
1761        };
1762
1763        // Capabilities
1764        let caps = router.capabilities();
1765        assert!(caps.contains(ChildCapabilities::LIST));
1766        assert!(caps.contains(ChildCapabilities::SEARCH));
1767        assert_eq!(caps, ChildCapabilities::LIST | ChildCapabilities::SEARCH);
1768
1769        // list_children yields the full, non-empty, finite sequence.
1770        let list_stream = router
1771            .list_children()
1772            .await
1773            .expect("LIST capability set — expected Some(stream)");
1774        let listed: Vec<String> = list_stream.collect().await;
1775        assert_eq!(listed, vec!["alpha".to_string(), "beta".into(), "alphabet".into()]);
1776
1777        // search_children filters by the query string.
1778        let search_stream = router
1779            .search_children("alpha")
1780            .await
1781            .expect("SEARCH capability set — expected Some(stream)");
1782        let matched: Vec<String> = search_stream.collect().await;
1783        assert_eq!(matched, vec!["alpha".to_string(), "alphabet".into()]);
1784    }
1785}