Skip to main content

plexus_core/plexus/
plexus.rs

1//! DynamicHub - the central routing layer for activations
2//!
3//! DynamicHub IS an activation that also serves as the registry for other activations.
4//! It implements the Plexus RPC protocol for routing and introspection.
5//! It uses hub-macro for its methods, with the `call` method using the streaming
6//! pattern to forward responses from routed methods.
7
8use super::{
9    context::PlexusContext,
10    method_enum::MethodEnumSchema,
11    schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12    streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use bitflags::bitflags;
18use futures::Stream;
19use futures_core::stream::BoxStream;
20use jsonrpsee::core::server::Methods;
21use jsonrpsee::RpcModule;
22
23/// The JSON-RPC method name used in all plexus subscription notifications.
24///
25/// Every subscription registered by plexus (`.call`, `.hash`, `.schema`, `_info`)
26/// sends notifications with `"method": PLEXUS_NOTIF_METHOD` on the wire.
27/// Clients must match against this value when dispatching raw subscription frames.
28pub const PLEXUS_NOTIF_METHOD: &str = "result";
29use schemars::JsonSchema;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35// ============================================================================
36// Error Types
37// ============================================================================
38
39#[derive(Debug, Clone)]
40pub enum PlexusError {
41    ActivationNotFound(String),
42    MethodNotFound { activation: String, method: String },
43    InvalidParams(String),
44    ExecutionError(String),
45    HandleNotSupported(String),
46    TransportError(TransportErrorKind),
47    Unauthenticated(String),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(tag = "error_kind", rename_all = "snake_case")]
52pub enum TransportErrorKind {
53    ConnectionRefused { host: String, port: u16 },
54    ConnectionTimeout { host: String, port: u16 },
55    ProtocolError { message: String },
56    NetworkError { message: String },
57}
58
59impl std::fmt::Display for TransportErrorKind {
60    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61        match self {
62            TransportErrorKind::ConnectionRefused { host, port } => {
63                write!(f, "Connection refused to {}:{}", host, port)
64            }
65            TransportErrorKind::ConnectionTimeout { host, port } => {
66                write!(f, "Connection timeout to {}:{}", host, port)
67            }
68            TransportErrorKind::ProtocolError { message } => {
69                write!(f, "Protocol error: {}", message)
70            }
71            TransportErrorKind::NetworkError { message } => {
72                write!(f, "Network error: {}", message)
73            }
74        }
75    }
76}
77
78impl std::fmt::Display for PlexusError {
79    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80        match self {
81            PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
82            PlexusError::MethodNotFound { activation, method } => {
83                write!(f, "Method not found: {}.{}", activation, method)
84            }
85            PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
86            PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
87            PlexusError::HandleNotSupported(activation) => {
88                write!(f, "Handle resolution not supported by activation: {}", activation)
89            }
90            PlexusError::TransportError(kind) => match kind {
91                TransportErrorKind::ConnectionRefused { host, port } => {
92                    write!(f, "Connection refused to {}:{}", host, port)
93                }
94                TransportErrorKind::ConnectionTimeout { host, port } => {
95                    write!(f, "Connection timeout to {}:{}", host, port)
96                }
97                TransportErrorKind::ProtocolError { message } => {
98                    write!(f, "Protocol error: {}", message)
99                }
100                TransportErrorKind::NetworkError { message } => {
101                    write!(f, "Network error: {}", message)
102                }
103            }
104            PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
105        }
106    }
107}
108
109impl std::error::Error for PlexusError {}
110
111/// Convert PlexusError to a JSON-RPC ErrorObject with semantic error codes.
112///
113/// Codes:
114/// - `-32001`: Authentication required (custom app-level error)
115/// - `-32601`: Method/activation not found (standard JSON-RPC)
116/// - `-32602`: Invalid parameters (standard JSON-RPC)
117/// - `-32000`: Generic server error (execution, transport, handle errors)
118/// Get the semantic JSON-RPC error code for a PlexusError.
119fn plexus_error_code(e: &PlexusError) -> i32 {
120    match e {
121        PlexusError::Unauthenticated(_) => -32001,
122        PlexusError::InvalidParams(_) => -32602,
123        PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
124        _ => -32000,
125    }
126}
127
128/// Convert PlexusError to a JSON-RPC ErrorObject with semantic error codes.
129fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
130    jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
131}
132
133// ============================================================================
134// Schema Types
135// ============================================================================
136
137#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
138pub struct ActivationInfo {
139    pub namespace: String,
140    pub version: String,
141    pub description: String,
142    pub methods: Vec<String>,
143}
144
145// ============================================================================
146// Activation Trait
147// ============================================================================
148
149#[async_trait]
150pub trait Activation: Send + Sync + 'static {
151    type Methods: MethodEnumSchema;
152
153    fn namespace(&self) -> &str;
154    fn version(&self) -> &str;
155    /// Short description (max 15 words)
156    fn description(&self) -> &str { "No description available" }
157    /// Long description (optional, for detailed documentation)
158    fn long_description(&self) -> Option<&str> { None }
159    fn methods(&self) -> Vec<&str>;
160    fn method_help(&self, _method: &str) -> Option<String> { None }
161    /// Stable activation instance ID for handle routing
162    /// By default generates a deterministic UUID from namespace+major_version
163    /// Using major version only ensures handles survive minor/patch upgrades (semver)
164    fn plugin_id(&self) -> uuid::Uuid {
165        let major_version = self.version().split('.').next().unwrap_or("0");
166        uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
167    }
168
169    async fn call(
170        &self,
171        method: &str,
172        params: Value,
173        auth: Option<&super::auth::AuthContext>,
174        raw_ctx: Option<&crate::request::RawRequestContext>,
175    ) -> Result<PlexusStream, PlexusError>;
176    async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
177        Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
178    }
179
180    fn into_rpc_methods(self) -> Methods where Self: Sized;
181
182    /// Return this activation's schema (methods + optional children)
183    fn plugin_schema(&self) -> PluginSchema {
184        use std::collections::hash_map::DefaultHasher;
185        use std::hash::{Hash, Hasher};
186
187        let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
188            let desc = self.method_help(name).unwrap_or_default();
189            // Compute a simple hash for methods not using hub-macro
190            let mut hasher = DefaultHasher::new();
191            name.hash(&mut hasher);
192            desc.hash(&mut hasher);
193            let hash = format!("{:016x}", hasher.finish());
194            MethodSchema::new(name.to_string(), desc, hash)
195        }).collect();
196
197        if let Some(long_desc) = self.long_description() {
198            PluginSchema::leaf_with_long_description(
199                self.namespace(),
200                self.version(),
201                self.description(),
202                long_desc,
203                methods,
204            )
205        } else {
206            PluginSchema::leaf(
207                self.namespace(),
208                self.version(),
209                self.description(),
210                methods,
211            )
212        }
213    }
214}
215
216// ============================================================================
217// Child Routing for Hub Plugins
218// ============================================================================
219
220bitflags! {
221    /// Opt-in capability flags advertising which optional `ChildRouter`
222    /// operations a router supports.
223    ///
224    /// The Plexus RPC network is a *graph*, not a tree: children may be
225    /// remote, infinite, or deliberately private. Listing and searching
226    /// children are therefore opt-in — routers must declare them here
227    /// before callers can rely on them.
228    ///
229    /// # Contract
230    ///
231    /// | Condition | Expected |
232    /// |---|---|
233    /// | `capabilities().contains(LIST)` is `true` | `list_children().await` returns `Some(stream)` |
234    /// | `capabilities().contains(LIST)` is `false` | `list_children().await` returns `None` |
235    /// | `capabilities().contains(SEARCH)` is `true` | `search_children(q).await` returns `Some(stream)` for every `q` |
236    /// | `capabilities().contains(SEARCH)` is `false` | `search_children(q).await` returns `None` for every `q` |
237    ///
238    /// These rules are not runtime-enforced; advertising a capability you
239    /// do not implement is a correctness bug in the router.
240    ///
241    /// # Deprecated (IR-4)
242    ///
243    /// This bitflags type is superseded by the `MethodRole::DynamicChild {
244    /// list_method, search_method }` tag on the corresponding gate method.
245    /// Consumers that want to know whether a child router supports list /
246    /// search operations should inspect the gate method's role instead of
247    /// calling `ChildRouter::capabilities()`. The type stays on the wire for
248    /// the 0.5 transition window and is slated for removal in 0.7.
249    #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
250    #[deprecated(
251        since = "0.5",
252        note = "Use MethodRole::DynamicChild { list_method, search_method } instead. Removed in 0.7."
253    )]
254    pub struct ChildCapabilities: u32 {
255        /// The router promises `list_children()` returns `Some(stream)`.
256        const LIST = 0b0000_0001;
257        /// The router promises `search_children(query)` returns
258        /// `Some(stream)` for any query.
259        const SEARCH = 0b0000_0010;
260    }
261}
262
263/// Trait for activations that can route to child activations
264///
265/// Hub activations implement this to support nested method routing.
266/// When a method like "mercury.info" is called on a solar activation,
267/// this trait enables routing to the mercury child.
268///
269/// This trait is separate from Activation to avoid associated type issues
270/// with dynamic dispatch.
271///
272/// # Optional capabilities
273///
274/// In addition to the required `router_namespace` + `get_child` surface,
275/// routers may opt in to advertising enumerable and searchable children
276/// via [`ChildCapabilities`]. When a flag is set, the corresponding
277/// `list_children` / `search_children` method must return `Some(stream)`.
278/// The default implementations report no capabilities and return `None`.
279#[async_trait]
280pub trait ChildRouter: Send + Sync {
281    /// Get the namespace of this router (for error messages)
282    fn router_namespace(&self) -> &str;
283
284    /// Call a method on this router
285    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
286
287    /// Get a child activation instance by name for nested routing
288    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
289
290    /// Which optional operations (list / search) this router supports.
291    ///
292    /// Defaults to [`ChildCapabilities::empty()`]: a router that only
293    /// exposes `get_child` for exact-name lookup.
294    #[allow(deprecated)]
295    fn capabilities(&self) -> ChildCapabilities {
296        ChildCapabilities::empty()
297    }
298
299    /// Stream every child name the router is willing to enumerate.
300    ///
301    /// Returns `None` when the router does not support listing — callers
302    /// should check [`ChildRouter::capabilities`] first.
303    ///
304    /// Routers that implement this **must** set
305    /// [`ChildCapabilities::LIST`] in [`ChildRouter::capabilities`].
306    async fn list_children(&self) -> Option<BoxStream<'_, String>> {
307        None
308    }
309
310    /// Stream child names matching the router-defined query semantics.
311    ///
312    /// Returns `None` when the router does not support searching — callers
313    /// should check [`ChildRouter::capabilities`] first.
314    ///
315    /// Routers that implement this **must** set
316    /// [`ChildCapabilities::SEARCH`] in [`ChildRouter::capabilities`].
317    async fn search_children(&self, _query: &str) -> Option<BoxStream<'_, String>> {
318        None
319    }
320
321    // AUTHLANG-3 — three default-implemented methods that the framework's
322    // dispatch path (`route_to_child` below) consults. Existing impls keep
323    // compiling unchanged: they inherit the defaults below. Hub-level impls
324    // (DynamicHub) override them to consult the registry/principal/sink the
325    // hub holds.
326
327    /// Look up the forward policy declared for a callee namespace.
328    ///
329    /// Default: returns `None`, which the framework interprets as
330    /// [`plexus_auth_core::IdentityOnly`] — the safe default per
331    /// `AUTHLANG-S01-output` §5. Macro-emitted impls (AUTHLANG-4) override
332    /// this from the `#[plexus::activation(forward_policy = ...)]`
333    /// attribute; the [`DynamicHub`] override consults its
334    /// [`ForwardPolicyRegistry`](super::forward_registry::ForwardPolicyRegistry).
335    fn forward_policy_for(
336        &self,
337        _callee_ns: &str,
338    ) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
339        None
340    }
341
342    /// Framework-stamped immediate-caller [`plexus_auth_core::Principal`] of
343    /// this router.
344    ///
345    /// Default: [`plexus_auth_core::Principal::Anonymous`]. The dispatch
346    /// path passes this into the [`plexus_auth_core::CallSite`] handed to
347    /// the policy so policies can implement callee-and-caller-aware
348    /// decisions (e.g., "PassThrough only when callee is in `audit.*`").
349    /// Hub-level impls override to return the per-connection stamp.
350    fn framework_stamped_principal(&self) -> plexus_auth_core::Principal {
351        plexus_auth_core::Principal::Anonymous
352    }
353}
354
355/// Route a method call to a child activation
356///
357/// This is called from generated code when a hub activation receives
358/// a method that doesn't match its local methods. If the method
359/// contains a dot (e.g., "mercury.info"), it routes to the child.
360///
361/// # AUTHLANG-3 dispatch sequence
362///
363/// Between callee resolution (`get_child`) and the actual dispatch
364/// (`router_call`), the framework runs the forwarding-policy step pinned
365/// in `plans/AUTHLANG/AUTHLANG-S01-output.md` §3:
366///
367/// 1. Resolve the policy registered for the callee namespace via
368///    [`ChildRouter::forward_policy_for`]; default
369///    [`plexus_auth_core::IdentityOnly`] when none is declared.
370/// 2. Build a [`plexus_auth_core::CallSite`] from the parent router's
371///    framework-stamped principal and the callee's [`MethodPath`].
372/// 3. Invoke [`plexus_auth_core::ForwardPolicy::forward`] to obtain a
373///    [`plexus_auth_core::ForwardDerivation`].
374/// 4. *(deferred — PRIVACY-1)* Emit one `AuditRecord` with
375///    `kind: ForwardPolicyApplied` to the configured `AuditSink`.
376/// 5. Mint the callee `AuthContext` via the framework-only constructor
377///    [`plexus_auth_core::AuthContext::derive_callee_context`].
378/// 6. Dispatch to `child.router_call(...)` with the derived context.
379///
380/// The policy step is invisible to activation authors per AUTHZ-0
381/// principle 1 ("trust is structural, not procedural"). The
382/// [`plexus_auth_core::ForwardPolicy::forward`] surface returns
383/// *parameters*, never a constructed `AuthContext`; the framework is the
384/// only entity that can mint one, per the sealed-type pattern.
385pub async fn route_to_child<T: ChildRouter + ?Sized>(
386    parent: &T,
387    method: &str,
388    params: Value,
389    auth: Option<&super::auth::AuthContext>,
390    raw_ctx: Option<&crate::request::RawRequestContext>,
391) -> Result<PlexusStream, PlexusError> {
392    // Try to split on first dot for nested routing
393    if let Some((child_name, rest)) = method.split_once('.') {
394        if let Some(child) = parent.get_child(child_name).await {
395            // ── AUTHLANG-3: forwarding-policy dispatch sequence ───────────
396            // Steps 1–3, 5–6 per the pinned spike §3. Step 4 (audit
397            // emission) is deferred until PRIVACY-1 lands `AuditRecord` /
398            // `AuditSink` / `ForwardPolicyApplied`; the TODO below marks
399            // the exact insertion point. See run-notes on the ticket.
400
401            // Step 1: resolve the policy registered for the callee
402            // namespace; default to IdentityOnly per the spike-pinned safe
403            // default (AUTHLANG-S01-output §5).
404            let policy: std::sync::Arc<dyn plexus_auth_core::ForwardPolicy> = parent
405                .forward_policy_for(child_name)
406                .unwrap_or_else(|| {
407                    std::sync::Arc::new(plexus_auth_core::IdentityOnly)
408                        as std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>
409                });
410
411            // Step 2: build the CallSite. The framework-built path string
412            // is always a valid MethodPath because the caller already
413            // validated the inbound method on its way in; if validation
414            // ever fails here it indicates a framework bug, not a user
415            // input error.
416            let callee_method_str = format!("{}.{}", child_name, rest);
417            let callee_method = plexus_auth_core::MethodPath::try_new(callee_method_str.as_str())
418                .map_err(|e| PlexusError::ExecutionError(format!(
419                    "framework-built MethodPath rejected: {} ({:?})",
420                    callee_method_str, e
421                )))?;
422            let site = plexus_auth_core::CallSite::new(
423                parent.framework_stamped_principal(),
424                callee_method,
425            );
426
427            // Step 3: invoke the policy. When the caller has no
428            // AuthContext (anonymous edge), feed the policy the anonymous
429            // sealed context so the policy contract is honored uniformly.
430            let anonymous_owned;
431            let caller_ctx: &super::auth::AuthContext = match auth {
432                Some(ctx) => ctx,
433                None => {
434                    anonymous_owned = super::auth::AuthContext::anonymous();
435                    &anonymous_owned
436                }
437            };
438            let derivation = policy.forward(caller_ctx, &site);
439
440            // Step 4 (DEFERRED — PRIVACY-1): emit AuditRecord with
441            // kind: ForwardPolicyApplied before dispatch. When PRIVACY-1
442            // lands `AuditRecord`, `AuditSink`, and `ForwardPolicyApplied`
443            // in `plexus_auth_core`, add a `ChildRouter::audit_sink()`
444            // default method (returning a no-op sink) and call:
445            //
446            //     parent.audit_sink().write(
447            //         AuditRecord::for_forward(
448            //             &site.callee_method,
449            //             &site.caller,
450            //             policy.name(),
451            //             derivation,
452            //             auth.and_then(|c| c.verified_user_id()),
453            //         )
454            //     ).await;
455            //
456            // Sink failure must be logged at WARN and NOT propagated
457            // (acceptance-criteria row 4 in AUTHLANG-3 §"Required
458            // behavior"). Until then, log a structured trace event so
459            // operators can confirm the policy step ran:
460            tracing::trace!(
461                target: "plexus::audit",
462                policy = policy.name().as_str(),
463                callee_method = %site.callee_method.as_str(),
464                derivation_keep_verified_user = derivation.keep_verified_user,
465                derivation_keep_roles = derivation.keep_roles,
466                derivation_keep_capabilities = derivation.keep_capabilities,
467                derivation_keep_metadata = derivation.keep_metadata,
468                "forward_policy_applied (audit-record emission stubbed pending PRIVACY-1)"
469            );
470
471            // Step 5+6: framework-blessed derivation of the callee sealed
472            // AuthContext, and dispatch with it. The policy NEVER sees the
473            // constructed value — it returned *parameters*; the framework
474            // consumed them via `with_callee_context`, which scopes the
475            // callee to the dispatch closure (the raw constructor remains
476            // pub(crate) to plexus-auth-core).
477            return match auth {
478                Some(caller_ctx) => {
479                    caller_ctx
480                        .with_callee_context(&derivation, &site.caller, |callee_ctx| async move {
481                            child
482                                .router_call(rest, params, Some(&callee_ctx), raw_ctx)
483                                .await
484                        })
485                        .await
486                }
487                None => child.router_call(rest, params, None, raw_ctx).await,
488            };
489        }
490        return Err(PlexusError::ActivationNotFound(child_name.to_string()));
491    }
492
493    // No dot - method simply not found
494    Err(PlexusError::MethodNotFound {
495        activation: parent.router_namespace().to_string(),
496        method: method.to_string(),
497    })
498}
499
500/// Wrapper to implement ChildRouter for Arc<dyn ChildRouter>
501///
502/// This allows DynamicHub to return its stored Arc<dyn ChildRouter> from get_child()
503struct ArcChildRouter(Arc<dyn ChildRouter>);
504
505#[async_trait]
506impl ChildRouter for ArcChildRouter {
507    fn router_namespace(&self) -> &str {
508        self.0.router_namespace()
509    }
510
511    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
512        self.0.router_call(method, params, auth, raw_ctx).await
513    }
514
515    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
516        self.0.get_child(name).await
517    }
518
519    #[allow(deprecated)]
520    fn capabilities(&self) -> ChildCapabilities {
521        self.0.capabilities()
522    }
523
524    async fn list_children(&self) -> Option<BoxStream<'_, String>> {
525        self.0.list_children().await
526    }
527
528    async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
529        self.0.search_children(query).await
530    }
531
532    // AUTHLANG-3 — forward the new ChildRouter trait methods through the
533    // Arc wrapper so a `DynamicHub` reached via `get_child` keeps its
534    // overrides (especially `forward_policy_for`).
535    fn forward_policy_for(
536        &self,
537        callee_ns: &str,
538    ) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
539        self.0.forward_policy_for(callee_ns)
540    }
541
542    fn framework_stamped_principal(&self) -> plexus_auth_core::Principal {
543        self.0.framework_stamped_principal()
544    }
545}
546
547// ============================================================================
548// Internal Type-Erased Activation
549// ============================================================================
550
551#[async_trait]
552#[allow(dead_code)] // Methods exist for completeness but some aren't called post-erasure yet
553trait ActivationObject: Send + Sync + 'static {
554    fn namespace(&self) -> &str;
555    fn version(&self) -> &str;
556    fn description(&self) -> &str;
557    fn long_description(&self) -> Option<&str>;
558    fn methods(&self) -> Vec<&str>;
559    fn method_help(&self, method: &str) -> Option<String>;
560    fn plugin_id(&self) -> uuid::Uuid;
561    async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
562    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
563    fn plugin_schema(&self) -> PluginSchema;
564    fn schema(&self) -> Schema;
565}
566
567struct ActivationWrapper<A: Activation> {
568    inner: A,
569}
570
571#[async_trait]
572impl<A: Activation> ActivationObject for ActivationWrapper<A> {
573    fn namespace(&self) -> &str { self.inner.namespace() }
574    fn version(&self) -> &str { self.inner.version() }
575    fn description(&self) -> &str { self.inner.description() }
576    fn long_description(&self) -> Option<&str> { self.inner.long_description() }
577    fn methods(&self) -> Vec<&str> { self.inner.methods() }
578    fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
579    fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
580
581    async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
582        self.inner.call(method, params, auth, raw_ctx).await
583    }
584
585    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
586        self.inner.resolve_handle(handle).await
587    }
588
589    fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
590
591    fn schema(&self) -> Schema {
592        let schema = schemars::schema_for!(A::Methods);
593        serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
594            .expect("parse schema")
595    }
596}
597
598// ============================================================================
599// Plexus Event Types
600// ============================================================================
601
602#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
603#[serde(tag = "event", rename_all = "snake_case")]
604pub enum HashEvent {
605    Hash { value: String },
606}
607
608/// Event for schema() RPC method - returns plugin schema
609#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
610#[serde(tag = "event", rename_all = "snake_case")]
611pub enum SchemaEvent {
612    /// This plugin's schema
613    Schema(PluginSchema),
614}
615
616/// Lightweight hash information for cache validation
617#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
618pub struct PluginHashes {
619    pub namespace: String,
620    pub self_hash: String,
621    #[serde(skip_serializing_if = "Option::is_none")]
622    pub children_hash: Option<String>,
623    pub hash: String,
624    /// Child plugin hashes (for recursive checking)
625    #[serde(skip_serializing_if = "Option::is_none")]
626    pub children: Option<Vec<ChildHashes>>,
627}
628
629/// Hash information for a child plugin
630#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
631pub struct ChildHashes {
632    pub namespace: String,
633    pub hash: String,
634}
635
636
637// ============================================================================
638// Activation Registry
639// ============================================================================
640
641/// Entry in the activation registry
642#[derive(Debug, Clone)]
643pub struct PluginEntry {
644    /// Stable activation instance ID
645    pub id: uuid::Uuid,
646    /// Current path/namespace for this activation
647    pub path: String,
648    /// Activation type (e.g., "cone", "bash", "arbor")
649    pub plugin_type: String,
650}
651
652/// Registry mapping activation UUIDs to their current paths
653///
654/// This enables handle routing without path dependency - handles reference
655/// activations by their stable UUID, and the registry maps to the current path.
656#[derive(Default)]
657pub struct PluginRegistry {
658    /// Lookup by plugin UUID
659    by_id: HashMap<uuid::Uuid, PluginEntry>,
660    /// Lookup by current path (for reverse lookup)
661    by_path: HashMap<String, uuid::Uuid>,
662}
663
664/// Read-only snapshot of the activation registry
665///
666/// Safe to use outside of DynamicHub locks.
667#[derive(Clone)]
668pub struct PluginRegistrySnapshot {
669    by_id: HashMap<uuid::Uuid, PluginEntry>,
670    by_path: HashMap<String, uuid::Uuid>,
671}
672
673impl PluginRegistrySnapshot {
674    /// Look up an activation's path by its UUID
675    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
676        self.by_id.get(&id).map(|e| e.path.as_str())
677    }
678
679    /// Look up an activation's UUID by its path
680    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
681        self.by_path.get(path).copied()
682    }
683
684    /// Get an activation entry by its UUID
685    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
686        self.by_id.get(&id)
687    }
688
689    /// List all registered activations
690    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
691        self.by_id.values()
692    }
693
694    /// Get the number of registered plugins
695    pub fn len(&self) -> usize {
696        self.by_id.len()
697    }
698
699    /// Check if the registry is empty
700    pub fn is_empty(&self) -> bool {
701        self.by_id.is_empty()
702    }
703}
704
705impl PluginRegistry {
706    /// Create a new empty registry
707    pub fn new() -> Self {
708        Self::default()
709    }
710
711    /// Look up an activation's path by its UUID
712    pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
713        self.by_id.get(&id).map(|e| e.path.as_str())
714    }
715
716    /// Look up an activation's UUID by its path
717    pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
718        self.by_path.get(path).copied()
719    }
720
721    /// Get an activation entry by its UUID
722    pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
723        self.by_id.get(&id)
724    }
725
726    /// Register an activation
727    pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
728        let entry = PluginEntry { id, path: path.clone(), plugin_type };
729        self.by_id.insert(id, entry);
730        self.by_path.insert(path, id);
731    }
732
733    /// List all registered activations
734    pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
735        self.by_id.values()
736    }
737
738    /// Get the number of registered plugins
739    pub fn len(&self) -> usize {
740        self.by_id.len()
741    }
742
743    /// Check if the registry is empty
744    pub fn is_empty(&self) -> bool {
745        self.by_id.is_empty()
746    }
747}
748
749// ============================================================================
750// DynamicHub (formerly Plexus)
751// ============================================================================
752
753/// Build the JSON payload for the `_info` well-known endpoint.
754///
755/// The shape is `{"backend": "<ns>", "auth_capabilities": {…}}` per
756/// AUTHZ-S01-output §2 / AUTHZ-CORE-3. When the backend has not declared its
757/// capabilities via [`DynamicHub::with_auth_capabilities`], the field falls
758/// back to [`plexus_auth_core::BackendAuthCapabilities::anonymous_default`]
759/// (a single `Anonymous` mechanism). The `_info` endpoint itself remains
760/// public — no authentication is required to read it.
761fn build_info_payload(
762    namespace: &str,
763    caps: Option<&plexus_auth_core::BackendAuthCapabilities>,
764) -> serde_json::Value {
765    let advertised = match caps {
766        Some(c) => c.clone(),
767        None => plexus_auth_core::BackendAuthCapabilities::anonymous_default(),
768    };
769    serde_json::json!({
770        "backend": namespace,
771        "auth_capabilities": advertised,
772    })
773}
774
775struct DynamicHubInner {
776    /// Custom namespace for this hub instance (defaults to "plexus")
777    namespace: String,
778    activations: HashMap<String, Arc<dyn ActivationObject>>,
779    /// Child routers for direct nested routing (e.g., hub.solar.mercury.info)
780    child_routers: HashMap<String, Arc<dyn ChildRouter>>,
781    /// Activation registry mapping UUIDs to paths
782    registry: std::sync::RwLock<PluginRegistry>,
783    pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
784    /// What this backend advertises at `_info`'s `auth_capabilities` field.
785    ///
786    /// `None` means the backend has not called
787    /// [`DynamicHub::with_auth_capabilities`]; `_info` falls back to
788    /// [`plexus_auth_core::BackendAuthCapabilities::anonymous_default`]
789    /// (a single `Anonymous` mechanism, no default). This preserves today's
790    /// no-auth substrate behavior while signaling "no auth wired" to
791    /// capability-aware clients.
792    ///
793    /// Per AUTHZ-CORE-3 and AUTHZ-S01-output §2.
794    auth_capabilities: Option<plexus_auth_core::BackendAuthCapabilities>,
795    /// AUTHLANG-3 — per-hub mapping from callee namespace to the
796    /// [`plexus_auth_core::ForwardPolicy`] consulted at every
797    /// cross-boundary call routed through this hub. Populated declaratively
798    /// (by the AUTHLANG-4 macro emission) or imperatively (via
799    /// [`DynamicHub::with_forward_policy`]). When the registry has no entry
800    /// for a callee namespace, the framework falls back to
801    /// [`plexus_auth_core::IdentityOnly`] per the spike-pinned safe
802    /// default. See `plans/AUTHLANG/AUTHLANG-S01-output.md` §3.
803    forward_policies: super::forward_registry::ForwardPolicyRegistry,
804}
805
806/// DynamicHub - an activation that routes to dynamically registered child activations
807///
808/// Unlike hub activations with hardcoded children (like Solar),
809/// DynamicHub allows registering activations at runtime via `.register()`.
810///
811/// # Direct Hosting
812///
813/// For a single activation, host it directly:
814/// ```ignore
815/// let solar = Arc::new(Solar::new());
816/// TransportServer::builder(solar, converter).serve().await?;
817/// ```
818///
819/// # Composition
820///
821/// For multiple top-level activations, use DynamicHub:
822/// ```ignore
823/// let hub = DynamicHub::with_namespace("myapp")
824///     .register(Solar::new())
825///     .register(Echo::new());
826/// ```
827#[derive(Clone)]
828pub struct DynamicHub {
829    inner: Arc<DynamicHubInner>,
830}
831
832// ============================================================================
833// DynamicHub Infrastructure (non-RPC methods)
834// ============================================================================
835
836impl DynamicHub {
837    /// Create a new DynamicHub with explicit namespace
838    ///
839    /// Unlike single activations which have fixed namespaces, DynamicHub is a
840    /// composition tool that can be named based on your application. Common choices:
841    /// - "hub" - generic default
842    /// - "substrate" - for substrate server
843    /// - "myapp" - for your application name
844    ///
845    /// The namespace appears in method calls: `{namespace}.call`, `{namespace}.schema`
846    pub fn new(namespace: impl Into<String>) -> Self {
847        Self {
848            inner: Arc::new(DynamicHubInner {
849                namespace: namespace.into(),
850                activations: HashMap::new(),
851                child_routers: HashMap::new(),
852                registry: std::sync::RwLock::new(PluginRegistry::new()),
853                pending_rpc: std::sync::Mutex::new(Vec::new()),
854                auth_capabilities: None,
855                forward_policies: super::forward_registry::ForwardPolicyRegistry::new(),
856            }),
857        }
858    }
859
860    /// Register a [`plexus_auth_core::ForwardPolicy`] for a callee
861    /// namespace.
862    ///
863    /// AUTHLANG-3 — every cross-boundary call through this hub consults
864    /// the registry at dispatch time. When `callee_ns` has no entry, the
865    /// framework falls back to [`plexus_auth_core::IdentityOnly`].
866    ///
867    /// AUTHLANG-4's `#[plexus::activation(forward_policy = ...)]`
868    /// attribute is the declarative path; this builder is the imperative
869    /// escape hatch used by integration tests and hand-rolled wiring.
870    ///
871    /// # Example
872    ///
873    /// ```ignore
874    /// use plexus_auth_core::PassThrough;
875    /// use std::sync::Arc;
876    ///
877    /// let hub = DynamicHub::new("my-backend")
878    ///     .with_forward_policy("solar", Arc::new(PassThrough));
879    /// ```
880    pub fn with_forward_policy(
881        mut self,
882        callee_ns: impl Into<String>,
883        policy: std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>,
884    ) -> Self {
885        let inner = Arc::get_mut(&mut self.inner)
886            .expect("Cannot register forward policy: DynamicHub has multiple references");
887        inner.forward_policies.register(callee_ns, policy);
888        self
889    }
890
891    /// Read-only view of the registered forward policies.
892    ///
893    /// Test-side accessor; production dispatch consults the registry via
894    /// the [`ChildRouter::forward_policy_for`] override.
895    pub fn forward_policies(&self) -> &super::forward_registry::ForwardPolicyRegistry {
896        &self.inner.forward_policies
897    }
898
899    /// Declare the backend's authentication capabilities, served at `_info`.
900    ///
901    /// Backends call this at builder time to advertise which auth mechanisms
902    /// they support (Bearer, Cookie, OIDC, Anonymous). Generic clients
903    /// (synapse CLI, gamma, generated SDKs) read the advertisement to decide
904    /// which authentication flow to drive.
905    ///
906    /// Without calling this method, `_info` emits the
907    /// [`plexus_auth_core::BackendAuthCapabilities::anonymous_default`]
908    /// fallback: a single `Anonymous` mechanism, no default. This preserves
909    /// today's no-auth substrate behavior.
910    ///
911    /// Per AUTHZ-CORE-3 / AUTHZ-S01-output §2.
912    ///
913    /// # Example
914    ///
915    /// ```no_run
916    /// # use plexus_core::DynamicHub;
917    /// use plexus_auth_core::{
918    ///     AuthMechanism, BackendAuthCapabilities, CookieName, MethodPath,
919    /// };
920    ///
921    /// let caps = BackendAuthCapabilities::new(
922    ///     vec![AuthMechanism::Cookie {
923    ///         cookie: CookieName::try_new("plexus_session").unwrap(),
924    ///         login: MethodPath::try_new("auth.login").unwrap(),
925    ///         refresh: None,
926    ///         logout: None,
927    ///     }],
928    ///     Some(0),
929    /// )
930    /// .unwrap();
931    ///
932    /// let hub = DynamicHub::new("my-backend").with_auth_capabilities(caps);
933    /// ```
934    pub fn with_auth_capabilities(
935        mut self,
936        caps: plexus_auth_core::BackendAuthCapabilities,
937    ) -> Self {
938        let inner = Arc::get_mut(&mut self.inner)
939            .expect("Cannot set auth_capabilities: DynamicHub has multiple references");
940        inner.auth_capabilities = Some(caps);
941        self
942    }
943
944    /// Returns the configured [`BackendAuthCapabilities`], or `None` if the
945    /// backend has not called [`Self::with_auth_capabilities`].
946    ///
947    /// Test-side accessor; production code reads the advertisement off `_info`.
948    ///
949    /// [`BackendAuthCapabilities`]: plexus_auth_core::BackendAuthCapabilities
950    pub fn auth_capabilities(&self) -> Option<&plexus_auth_core::BackendAuthCapabilities> {
951        self.inner.auth_capabilities.as_ref()
952    }
953
954    /// Deprecated: Use new() with explicit namespace instead
955    #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
956    pub fn with_namespace(namespace: impl Into<String>) -> Self {
957        Self::new(namespace)
958    }
959
960    /// Get the runtime namespace for this DynamicHub instance
961    pub fn runtime_namespace(&self) -> &str {
962        &self.inner.namespace
963    }
964
965    /// Get access to the activation registry
966    pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
967        self.inner.registry.read().unwrap()
968    }
969
970    /// Register an activation
971    pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
972        let namespace = activation.namespace().to_string();
973        let plugin_id = activation.plugin_id();
974        let activation_for_rpc = activation.clone();
975        let activation_for_router = activation.clone();
976
977        let inner = Arc::get_mut(&mut self.inner)
978            .expect("Cannot register: DynamicHub has multiple references");
979
980        // Register in the activation registry
981        inner.registry.write().unwrap().register(
982            plugin_id,
983            namespace.clone(),
984            namespace.clone(), // Use namespace as plugin_type for now
985        );
986
987        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
988        inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
989        inner.pending_rpc.lock().unwrap()
990            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
991        self
992    }
993
994    /// Register a hub activation that supports nested routing
995    ///
996    /// Hub activations implement `ChildRouter`, enabling direct nested method calls
997    /// like `hub.solar.mercury.info` at the RPC layer (no hub.call indirection).
998    #[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
999    pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
1000        let namespace = activation.namespace().to_string();
1001        let plugin_id = activation.plugin_id();
1002        let activation_for_rpc = activation.clone();
1003        let activation_for_router = activation.clone();
1004
1005        let inner = Arc::get_mut(&mut self.inner)
1006            .expect("Cannot register: DynamicHub has multiple references");
1007
1008        // Register in the activation registry
1009        inner.registry.write().unwrap().register(
1010            plugin_id,
1011            namespace.clone(),
1012            namespace.clone(), // Use namespace as plugin_type for now
1013        );
1014
1015        inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
1016        inner.child_routers.insert(namespace, Arc::new(activation_for_router));
1017        inner.pending_rpc.lock().unwrap()
1018            .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
1019        self
1020    }
1021
1022    /// List all methods across all activations
1023    pub fn list_methods(&self) -> Vec<String> {
1024        let mut methods = Vec::new();
1025
1026        // Include hub's own methods
1027        for m in Activation::methods(self) {
1028            methods.push(format!("{}.{}", self.inner.namespace, m));
1029        }
1030
1031        // Include registered activation methods
1032        for (ns, act) in &self.inner.activations {
1033            for m in act.methods() {
1034                methods.push(format!("{}.{}", ns, m));
1035            }
1036        }
1037        methods.sort();
1038        methods
1039    }
1040
1041    /// List all activations (including this hub itself)
1042    pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
1043        let mut activations = Vec::new();
1044
1045        // Include this hub itself
1046        activations.push(ActivationInfo {
1047            namespace: Activation::namespace(self).to_string(),
1048            version: Activation::version(self).to_string(),
1049            description: Activation::description(self).to_string(),
1050            methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
1051        });
1052
1053        // Include registered activations
1054        for a in self.inner.activations.values() {
1055            activations.push(ActivationInfo {
1056                namespace: a.namespace().to_string(),
1057                version: a.version().to_string(),
1058                description: a.description().to_string(),
1059                methods: a.methods().iter().map(|s| s.to_string()).collect(),
1060            });
1061        }
1062
1063        activations
1064    }
1065
1066    /// Compute hash for cache invalidation
1067    ///
1068    /// Returns the hash from the recursive plugin schema. This hash changes
1069    /// whenever any method definition or child plugin changes.
1070    pub fn compute_hash(&self) -> String {
1071        Activation::plugin_schema(self).hash
1072    }
1073
1074    /// Route a call to the appropriate activation
1075    pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
1076        self.route_with_ctx(method, params, auth, None).await
1077    }
1078
1079    /// Route a call to the appropriate activation, with optional raw HTTP request context.
1080    pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1081        let (namespace, method_name) = self.parse_method(method)?;
1082
1083        // Handle plexus's own methods
1084        if namespace == self.inner.namespace {
1085            return Activation::call(self, method_name, params, auth, raw_ctx).await;
1086        }
1087
1088        let activation = self.inner.activations.get(namespace)
1089            .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
1090
1091        activation.call(method_name, params, auth, raw_ctx).await
1092    }
1093
1094    /// Resolve a handle using the activation registry
1095    ///
1096    /// Looks up the activation by its UUID in the registry.
1097    pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1098        let path = self.inner.registry.read().unwrap()
1099            .lookup(handle.plugin_id)
1100            .map(|s| s.to_string())
1101            .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
1102
1103        let activation = self.inner.activations.get(&path)
1104            .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
1105        activation.resolve_handle(handle).await
1106    }
1107
1108    /// Get activation schema
1109    pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
1110        self.inner.activations.get(namespace).map(|a| a.schema())
1111    }
1112
1113    /// Get a snapshot of the activation registry (safe to use outside locks)
1114    pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
1115        let guard = self.inner.registry.read().unwrap();
1116        PluginRegistrySnapshot {
1117            by_id: guard.by_id.clone(),
1118            by_path: guard.by_path.clone(),
1119        }
1120    }
1121
1122    /// Look up an activation path by its UUID
1123    pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
1124        self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
1125    }
1126
1127    /// Look up an activation UUID by its path
1128    pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
1129        self.inner.registry.read().unwrap().lookup_by_path(path)
1130    }
1131
1132    /// Get activation schemas for all activations (including this hub itself)
1133    pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
1134        let mut schemas = Vec::new();
1135
1136        // Include this hub itself
1137        schemas.push(Activation::plugin_schema(self));
1138
1139        // Include registered activations
1140        for a in self.inner.activations.values() {
1141            schemas.push(a.plugin_schema());
1142        }
1143
1144        schemas
1145    }
1146
1147    /// Deprecated: use list_plugin_schemas instead
1148    #[deprecated(note = "Use list_plugin_schemas instead")]
1149    pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
1150        self.list_plugin_schemas()
1151    }
1152
1153    /// Get help for a method
1154    pub fn get_method_help(&self, method: &str) -> Option<String> {
1155        let (namespace, method_name) = self.parse_method(method).ok()?;
1156        let activation = self.inner.activations.get(namespace)?;
1157        activation.method_help(method_name)
1158    }
1159
1160    fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
1161        let parts: Vec<&str> = method.splitn(2, '.').collect();
1162        if parts.len() != 2 {
1163            return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
1164        }
1165        Ok((parts[0], parts[1]))
1166    }
1167
1168    /// Get child activation summaries (for hub functionality)
1169    /// Called by hub-macro when `hub` flag is set
1170    pub fn plugin_children(&self) -> Vec<ChildSummary> {
1171        self.inner.activations.values()
1172            .map(|a| {
1173                let schema = a.plugin_schema();
1174                ChildSummary {
1175                    namespace: schema.namespace,
1176                    description: schema.description,
1177                    hash: schema.hash,
1178                }
1179            })
1180            .collect()
1181    }
1182
1183    /// Convert to RPC module
1184    pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
1185        let mut module = RpcModule::new(());
1186
1187        PlexusContext::init(self.compute_hash());
1188
1189        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
1190        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
1191        let ns = self.runtime_namespace();
1192        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
1193        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
1194        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1195        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
1196        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1197        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
1198        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1199        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1200        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
1201
1202        // Register {ns}.call subscription
1203        let plexus_for_call = self.clone();
1204        module.register_subscription(
1205            call_method,
1206            PLEXUS_NOTIF_METHOD,
1207            call_unsub,
1208            move |params, pending, _ctx, _ext| {
1209                let plexus = plexus_for_call.clone();
1210                Box::pin(async move {
1211                    let p: CallParams = params.parse()?;
1212                    match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
1213                        Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
1214                        Err(e) => {
1215                            let sink = pending.accept().await?;
1216                            let error_item = super::types::PlexusStreamItem::Error {
1217                                metadata: super::types::StreamMetadata::new(
1218                                    vec![ns_static.into()],
1219                                    PlexusContext::hash(),
1220                                ),
1221                                message: e.to_string(),
1222                                code: Some(plexus_error_code(&e).to_string()),
1223                                recoverable: false,
1224                            };
1225                            if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
1226                                let _ = sink.send(raw).await;
1227                            }
1228                            Ok(())
1229                        }
1230                    }
1231                })
1232            }
1233        )?;
1234
1235        // Register {ns}.hash subscription
1236        let plexus_for_hash = self.clone();
1237        module.register_subscription(
1238            hash_method,
1239            PLEXUS_NOTIF_METHOD,
1240            hash_unsub,
1241            move |_params, pending, _ctx, _ext| {
1242                let plexus = plexus_for_hash.clone();
1243                Box::pin(async move {
1244                    let schema = Activation::plugin_schema(&plexus);
1245                    let stream = async_stream::stream! {
1246                        yield HashEvent::Hash { value: schema.hash };
1247                    };
1248                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1249                    pipe_stream_to_subscription(pending, wrapped).await
1250                })
1251            }
1252        )?;
1253
1254        // Register {ns}.schema subscription
1255        let plexus_for_schema = self.clone();
1256        module.register_subscription(
1257            schema_method,
1258            PLEXUS_NOTIF_METHOD,
1259            schema_unsub,
1260            move |params, pending, _ctx, _ext| {
1261                let plexus = plexus_for_schema.clone();
1262                Box::pin(async move {
1263                    let p: SchemaParams = params.parse().unwrap_or_default();
1264                    let plugin_schema = Activation::plugin_schema(&plexus);
1265
1266                    let result = if let Some(ref name) = p.method {
1267                        plugin_schema.methods.iter()
1268                            .find(|m| m.name == *name)
1269                            .map(|m| super::SchemaResult::Method(m.clone()))
1270                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1271                                -32602,
1272                                format!("Method '{}' not found", name),
1273                                None::<()>,
1274                            ))?
1275                    } else {
1276                        super::SchemaResult::Plugin(plugin_schema)
1277                    };
1278
1279                    let stream = async_stream::stream! { yield result; };
1280                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1281                    pipe_stream_to_subscription(pending, wrapped).await
1282                })
1283            }
1284        )?;
1285
1286        // Register _info well-known endpoint (no namespace prefix).
1287        // Returns backend name + auth_capabilities (AUTHZ-CORE-3) as a
1288        // single-item stream with automatic Done event. Backends that have not
1289        // called with_auth_capabilities get the anonymous-default fallback so
1290        // capability-aware clients can still discover the auth surface.
1291        let info_payload = build_info_payload(
1292            self.runtime_namespace(),
1293            self.inner.auth_capabilities.as_ref(),
1294        );
1295        module.register_subscription(
1296            "_info",
1297            PLEXUS_NOTIF_METHOD,
1298            "_info_unsub",
1299            move |_params, pending, _ctx, _ext| {
1300                let payload = info_payload.clone();
1301                Box::pin(async move {
1302                    // Create a single-item stream with the info response
1303                    let info_stream = futures::stream::once(async move { payload });
1304
1305                    // Wrap to auto-append Done event
1306                    let wrapped = super::streaming::wrap_stream(
1307                        info_stream,
1308                        "_info",
1309                        vec![]
1310                    );
1311
1312                    // Pipe to subscription (handles Done automatically)
1313                    pipe_stream_to_subscription(pending, wrapped).await
1314                })
1315            }
1316        )?;
1317
1318        // Add all registered activation RPC methods
1319        let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
1320        for factory in pending {
1321            module.merge(factory())?;
1322        }
1323
1324        // CHILD-WIRE: for each registered child router with capability bits set,
1325        // register {ns}.list_children / {ns}.search_children as subscriptions.
1326        // Per-activation namespaced (not top-level _list_children).
1327        for (ns, router) in self.inner.child_routers.iter() {
1328            register_child_capability_methods(&mut module, ns, router.clone())?;
1329        }
1330
1331        Ok(module)
1332    }
1333
1334    /// Convert Arc<DynamicHub> to RPC module while keeping the Arc alive
1335    ///
1336    /// Unlike `into_rpc_module`, this keeps the Arc<DynamicHub> reference alive,
1337    /// which is necessary when activations hold Weak<DynamicHub> references that
1338    /// need to remain upgradeable.
1339    pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
1340        let mut module = RpcModule::new(());
1341
1342        PlexusContext::init(hub.compute_hash());
1343
1344        // Register hub methods with runtime namespace using dot notation (e.g., "plexus.call" or "hub.call")
1345        // Note: we leak these strings to get 'static lifetime required by jsonrpsee
1346        let ns = hub.runtime_namespace();
1347        let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
1348        let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
1349        let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1350        let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
1351        let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1352        let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
1353        let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1354        let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1355        let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
1356
1357        // Register {ns}.call subscription - clone Arc to keep reference alive
1358        let hub_for_call = hub.clone();
1359        module.register_subscription(
1360            call_method,
1361            call_method,
1362            call_unsub,
1363            move |params, pending, _ctx, ext| {
1364                let hub = hub_for_call.clone();
1365                Box::pin(async move {
1366                    let p: CallParams = params.parse()?;
1367                    // Extract auth context from Extensions (if present)
1368                    let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
1369                        .map(|arc| arc.as_ref());
1370                    match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
1371                        Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
1372                        Err(e) => {
1373                            // Accept the subscription, then send the error as a stream item.
1374                            // This preserves the error message and code — returning Err(...)
1375                            // from a subscription handler causes jsonrpsee to wrap it as
1376                            // generic -32603, discarding our semantic error code.
1377                            let sink = pending.accept().await?;
1378                            let error_item = super::types::PlexusStreamItem::Error {
1379                                metadata: super::types::StreamMetadata::new(
1380                                    vec![ns_static.into()],
1381                                    PlexusContext::hash(),
1382                                ),
1383                                message: e.to_string(),
1384                                code: Some(plexus_error_code(&e).to_string()),
1385                                recoverable: false,
1386                            };
1387                            if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
1388                                let _ = sink.send(raw).await;
1389                            }
1390                            Ok(())
1391                        }
1392                    }
1393                })
1394            }
1395        )?;
1396
1397        // Register {ns}.hash subscription
1398        let hub_for_hash = hub.clone();
1399        module.register_subscription(
1400            hash_method,
1401            PLEXUS_NOTIF_METHOD,
1402            hash_unsub,
1403            move |_params, pending, _ctx, _ext| {
1404                let hub = hub_for_hash.clone();
1405                Box::pin(async move {
1406                    let schema = Activation::plugin_schema(&*hub);
1407                    let stream = async_stream::stream! {
1408                        yield HashEvent::Hash { value: schema.hash };
1409                    };
1410                    let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1411                    pipe_stream_to_subscription(pending, wrapped).await
1412                })
1413            }
1414        )?;
1415
1416        // Register {ns}.schema subscription
1417        let hub_for_schema = hub.clone();
1418        module.register_subscription(
1419            schema_method,
1420            PLEXUS_NOTIF_METHOD,
1421            schema_unsub,
1422            move |params, pending, _ctx, _ext| {
1423                let hub = hub_for_schema.clone();
1424                Box::pin(async move {
1425                    let p: SchemaParams = params.parse().unwrap_or_default();
1426                    let plugin_schema = Activation::plugin_schema(&*hub);
1427
1428                    let result = if let Some(ref name) = p.method {
1429                        plugin_schema.methods.iter()
1430                            .find(|m| m.name == *name)
1431                            .map(|m| super::SchemaResult::Method(m.clone()))
1432                            .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1433                                -32602,
1434                                format!("Method '{}' not found", name),
1435                                None::<()>,
1436                            ))?
1437                    } else {
1438                        super::SchemaResult::Plugin(plugin_schema)
1439                    };
1440
1441                    let stream = async_stream::stream! {
1442                        yield result;
1443                    };
1444                    let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1445                    pipe_stream_to_subscription(pending, wrapped).await
1446                })
1447            }
1448        )?;
1449
1450        // Register _info well-known endpoint (no namespace prefix).
1451        // Returns backend name + auth_capabilities (AUTHZ-CORE-3) as a
1452        // single-item stream with automatic Done event. Same payload shape as
1453        // the sibling registration in into_rpc_module.
1454        let info_payload = build_info_payload(
1455            hub.runtime_namespace(),
1456            hub.inner.auth_capabilities.as_ref(),
1457        );
1458        module.register_subscription(
1459            "_info",
1460            PLEXUS_NOTIF_METHOD,
1461            "_info_unsub",
1462            move |_params, pending, _ctx, _ext| {
1463                let payload = info_payload.clone();
1464                Box::pin(async move {
1465                    // Create a single-item stream with the info response
1466                    let info_stream = futures::stream::once(async move { payload });
1467
1468                    // Wrap to auto-append Done event
1469                    let wrapped = super::streaming::wrap_stream(
1470                        info_stream,
1471                        "_info",
1472                        vec![]
1473                    );
1474
1475                    // Pipe to subscription (handles Done automatically)
1476                    pipe_stream_to_subscription(pending, wrapped).await
1477                })
1478            }
1479        )?;
1480
1481        // Register {ns}.respond method for WebSocket bidirectional responses
1482        // This allows clients to respond to server-initiated requests (like confirmations/prompts)
1483        let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
1484        module.register_async_method(respond_method, |params, _ctx, _ext| async move {
1485            use super::bidirectional::{handle_pending_response, BidirError};
1486
1487            let p: RespondParams = params.parse()?;
1488
1489            tracing::debug!(
1490                request_id = %p.request_id,
1491                "Handling {}.respond via WebSocket",
1492                "plexus"
1493            );
1494
1495            match handle_pending_response(&p.request_id, p.response_data) {
1496                Ok(()) => Ok(serde_json::json!({"success": true})),
1497                Err(BidirError::UnknownRequest) => {
1498                    tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1499                    Err(jsonrpsee::types::ErrorObject::owned(
1500                        -32602,
1501                        format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1502                        None::<()>,
1503                    ))
1504                }
1505                Err(BidirError::ChannelClosed) => {
1506                    tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1507                    Err(jsonrpsee::types::ErrorObject::owned(
1508                        -32000,
1509                        "Response channel was closed (request may have timed out)",
1510                        None::<()>,
1511                    ))
1512                }
1513                Err(e) => {
1514                    tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1515                    Err(jsonrpsee::types::ErrorObject::owned(
1516                        -32000,
1517                        format!("Failed to deliver response: {}", e),
1518                        None::<()>,
1519                    ))
1520                }
1521            }
1522        })?;
1523
1524        // Register pending RPC methods from activations
1525        let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1526        tracing::trace!(factories = pending.len(), "merging activation RPC factories");
1527        for (idx, factory) in pending.into_iter().enumerate() {
1528            tracing::trace!(factory_idx = idx, "calling factory to get Methods");
1529            let methods = factory();
1530            let method_count = methods.method_names().count();
1531            tracing::trace!(factory_idx = idx, methods = method_count, "factory returned Methods; merging into module");
1532            module.merge(methods)?;
1533            tracing::trace!(factory_idx = idx, "successfully merged factory methods");
1534        }
1535        tracing::trace!("all activations merged successfully");
1536
1537        // CHILD-WIRE: for each registered child router with capability bits set,
1538        // register {ns}.list_children / {ns}.search_children as subscriptions.
1539        for (ns, router) in hub.inner.child_routers.iter() {
1540            register_child_capability_methods(&mut module, ns, router.clone())?;
1541        }
1542
1543        Ok(module)
1544    }
1545}
1546
1547/// CHILD-WIRE: register per-activation namespaced `<ns>.list_children` and
1548/// `<ns>.search_children` as subscription methods when the router advertises
1549/// the corresponding capability bits.
1550///
1551/// Each name returned by `ChildRouter::list_children` / `search_children` is
1552/// emitted as a `data` envelope with `content_type` set to the method name
1553/// (`"list_children"` or `"search_children"`) and `content` carrying the name
1554/// string. Termination is `done`. Mirrors the standard `wrap_stream` shape
1555/// used by every other framework subscription.
1556///
1557/// Activations that advertise neither bit produce no registrations — calling
1558/// the methods returns standard `methodNotFound`. That's the wire-level
1559/// signal that the activation doesn't support enumeration / search.
1560#[allow(deprecated)] // ChildCapabilities is deprecated by IR-4 but still the wire-level signal
1561fn register_child_capability_methods(
1562    module: &mut RpcModule<()>,
1563    namespace: &str,
1564    router: Arc<dyn ChildRouter>,
1565) -> Result<(), jsonrpsee::core::RegisterMethodError> {
1566    let caps = router.capabilities();
1567    if caps.is_empty() {
1568        return Ok(());
1569    }
1570
1571    let ns_static: &'static str = Box::leak(namespace.to_string().into_boxed_str());
1572
1573    if caps.contains(ChildCapabilities::LIST) {
1574        let list_method: &'static str =
1575            Box::leak(format!("{}.list_children", namespace).into_boxed_str());
1576        let list_unsub: &'static str =
1577            Box::leak(format!("{}.list_children_unsub", namespace).into_boxed_str());
1578        let router_for_list = router.clone();
1579        module.register_subscription(
1580            list_method,
1581            PLEXUS_NOTIF_METHOD,
1582            list_unsub,
1583            move |_params, pending, _ctx, _ext| {
1584                let router = router_for_list.clone();
1585                Box::pin(async move {
1586                    // Collect names eagerly so the BoxStream's borrow on the
1587                    // router doesn't outlive list_children's call. For v1 this
1588                    // matches the typical pattern (small finite child sets like
1589                    // Solar's eight planets). A future variant could keep the
1590                    // Arc-borrow alive across the stream by binding the BoxStream
1591                    // to the Arc directly — out of scope here.
1592                    let collected: Vec<String> = match router.list_children().await {
1593                        Some(mut s) => {
1594                            use futures::StreamExt;
1595                            let mut acc = Vec::new();
1596                            while let Some(name) = s.next().await {
1597                                acc.push(name);
1598                            }
1599                            acc
1600                        }
1601                        None => Vec::new(),
1602                    };
1603                    let stream = async_stream::stream! {
1604                        for name in collected {
1605                            yield name;
1606                        }
1607                    };
1608                    let wrapped = super::streaming::wrap_stream(
1609                        stream,
1610                        "list_children",
1611                        vec![ns_static.into()],
1612                    );
1613                    pipe_stream_to_subscription(pending, wrapped).await
1614                })
1615            },
1616        )?;
1617    }
1618
1619    if caps.contains(ChildCapabilities::SEARCH) {
1620        let search_method: &'static str =
1621            Box::leak(format!("{}.search_children", namespace).into_boxed_str());
1622        let search_unsub: &'static str =
1623            Box::leak(format!("{}.search_children_unsub", namespace).into_boxed_str());
1624        let router_for_search = router.clone();
1625        module.register_subscription(
1626            search_method,
1627            PLEXUS_NOTIF_METHOD,
1628            search_unsub,
1629            move |params, pending, _ctx, _ext| {
1630                let router = router_for_search.clone();
1631                Box::pin(async move {
1632                    let p: SearchChildrenParams = params.parse()?;
1633                    let collected: Vec<String> = match router.search_children(&p.query).await {
1634                        Some(mut s) => {
1635                            use futures::StreamExt;
1636                            let mut acc = Vec::new();
1637                            while let Some(name) = s.next().await {
1638                                acc.push(name);
1639                            }
1640                            acc
1641                        }
1642                        None => Vec::new(),
1643                    };
1644                    let stream = async_stream::stream! {
1645                        for name in collected {
1646                            yield name;
1647                        }
1648                    };
1649                    let wrapped = super::streaming::wrap_stream(
1650                        stream,
1651                        "search_children",
1652                        vec![ns_static.into()],
1653                    );
1654                    pipe_stream_to_subscription(pending, wrapped).await
1655                })
1656            },
1657        )?;
1658    }
1659
1660    Ok(())
1661}
1662
1663/// Params for `<ns>.search_children`
1664#[derive(Debug, serde::Deserialize)]
1665struct SearchChildrenParams {
1666    query: String,
1667}
1668
1669/// Params for {ns}.call
1670#[derive(Debug, serde::Deserialize)]
1671struct CallParams {
1672    method: String,
1673    #[serde(default)]
1674    params: Option<Value>,
1675}
1676
1677/// Params for {ns}.schema
1678#[derive(Debug, Default, serde::Deserialize)]
1679struct SchemaParams {
1680    method: Option<String>,
1681}
1682
1683/// Params for {ns}.respond (WebSocket bidirectional response)
1684#[derive(Debug, serde::Deserialize)]
1685struct RespondParams {
1686    request_id: String,
1687    response_data: Value,
1688}
1689
1690/// Helper to pipe a PlexusStream to a subscription sink.
1691///
1692/// Notifications are sent with `method: PLEXUS_NOTIF_METHOD` on the wire,
1693/// as set by the `notif_method_name` arg in each `register_subscription` call.
1694async fn pipe_stream_to_subscription(
1695    pending: jsonrpsee::PendingSubscriptionSink,
1696    mut stream: PlexusStream,
1697) -> jsonrpsee::core::SubscriptionResult {
1698    use futures::StreamExt;
1699
1700    let sink = pending.accept().await?;
1701    while let Some(item) = stream.next().await {
1702        let json = serde_json::value::to_raw_value(&item)?;
1703        sink.send(json).await?;
1704    }
1705    Ok(())
1706}
1707
1708// ============================================================================
1709// DynamicHub RPC Methods (via plexus-macros)
1710// ============================================================================
1711
1712#[plexus_macros::activation(
1713    namespace = "plexus",
1714    version = "1.0.0",
1715    description = "Central routing and introspection",
1716    hub,
1717    namespace_fn = "runtime_namespace"
1718)]
1719#[allow(deprecated)]
1720impl DynamicHub {
1721    /// Route a call to a registered activation
1722    #[plexus_macros::method(
1723        streaming,
1724        description = "Route a call to a registered activation",
1725        params(
1726            method = "The method to call (format: namespace.method)",
1727            params = "Parameters to pass to the method (optional, defaults to {})"
1728        )
1729    )]
1730    async fn call(
1731        &self,
1732        method: String,
1733        params: Option<Value>,
1734    ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1735        use super::context::PlexusContext;
1736        use super::types::{PlexusStreamItem, StreamMetadata};
1737
1738        let result = self.route(&method, params.unwrap_or_default(), None).await;
1739
1740        match result {
1741            Ok(plexus_stream) => {
1742                // Forward the routed stream directly - it already contains PlexusStreamItems
1743                plexus_stream
1744            }
1745            Err(e) => {
1746                // Return error as a PlexusStreamItem stream
1747                let metadata = StreamMetadata::new(
1748                    vec![self.inner.namespace.clone()],
1749                    PlexusContext::hash(),
1750                );
1751                Box::pin(futures::stream::once(async move {
1752                    PlexusStreamItem::Error {
1753                        metadata,
1754                        message: e.to_string(),
1755                        code: None,
1756                        recoverable: false,
1757                    }
1758                }))
1759            }
1760        }
1761    }
1762
1763    /// Get Plexus RPC server configuration hash (from the recursive schema)
1764    ///
1765    /// This hash changes whenever any method or child activation changes.
1766    /// It's computed from the method hashes rolled up through the schema tree.
1767    #[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1768    async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1769        let schema = Activation::plugin_schema(self);
1770        stream! { yield HashEvent::Hash { value: schema.hash }; }
1771    }
1772
1773    /// Get plugin hashes for cache validation (lightweight alternative to full schema)
1774    #[plexus_macros::method(description = "Get plugin hashes for cache validation")]
1775    #[allow(deprecated)]
1776    async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1777        let schema = Activation::plugin_schema(self);
1778
1779        stream! {
1780            yield PluginHashes {
1781                namespace: schema.namespace.clone(),
1782                self_hash: schema.self_hash.clone(),
1783                children_hash: schema.children_hash.clone(),
1784                hash: schema.hash.clone(),
1785                children: schema.children.as_ref().map(|kids| {
1786                    kids.iter()
1787                        .map(|c| ChildHashes {
1788                            namespace: c.namespace.clone(),
1789                            hash: c.hash.clone(),
1790                        })
1791                        .collect()
1792                }),
1793            };
1794        }
1795    }
1796
1797    // Note: schema() method is auto-generated by hub-macro for all activations
1798}
1799
1800// ============================================================================
1801// HubContext Implementation for Weak<DynamicHub>
1802// ============================================================================
1803
1804use super::hub_context::HubContext;
1805use std::sync::Weak;
1806
1807/// HubContext implementation for Weak<DynamicHub>
1808///
1809/// This enables activations to receive a weak reference to their parent DynamicHub,
1810/// allowing them to resolve handles and route calls through the hub without
1811/// creating reference cycles.
1812#[async_trait]
1813impl HubContext for Weak<DynamicHub> {
1814    async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1815        let hub = self.upgrade().ok_or_else(|| {
1816            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1817        })?;
1818        hub.do_resolve_handle(handle).await
1819    }
1820
1821    async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1822        let hub = self.upgrade().ok_or_else(|| {
1823            PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1824        })?;
1825        hub.route(method, params, None).await
1826    }
1827
1828    fn is_valid(&self) -> bool {
1829        self.upgrade().is_some()
1830    }
1831}
1832
1833/// ChildRouter implementation for DynamicHub
1834///
1835/// This enables nested routing through registered activations.
1836/// e.g., hub.call("solar.mercury.info") routes to solar → mercury → info
1837#[async_trait]
1838impl ChildRouter for DynamicHub {
1839    fn router_namespace(&self) -> &str {
1840        &self.inner.namespace
1841    }
1842
1843    async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1844        // DynamicHub routes via its registered activations
1845        // Method format: "activation.method" or "activation.child.method"
1846        self.route_with_ctx(method, params, auth, raw_ctx).await
1847    }
1848
1849    async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1850        // Look up registered activations that implement ChildRouter
1851        self.inner.child_routers.get(name)
1852            .map(|router| {
1853                // Clone the Arc and wrap in Box for the trait object
1854                Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1855            })
1856    }
1857
1858    /// AUTHLANG-3 — consult the hub's
1859    /// [`ForwardPolicyRegistry`](super::forward_registry::ForwardPolicyRegistry).
1860    fn forward_policy_for(
1861        &self,
1862        callee_ns: &str,
1863    ) -> Option<std::sync::Arc<dyn plexus_auth_core::ForwardPolicy>> {
1864        self.inner.forward_policies.get(callee_ns)
1865    }
1866
1867    // `framework_stamped_principal` retains the trait default
1868    // (`Principal::Anonymous`) for now. AUTHLANG-3 wires the dispatch path
1869    // to read this; populating it with the per-connection stamp lands
1870    // when the principal-minting service (post-AUTHZ-0 / future
1871    // CRED-CORE) is wired into the WS upgrade path. The current
1872    // anonymous return value is correct under today's no-auth substrate.
1873}
1874
1875#[cfg(test)]
1876#[allow(deprecated)]
1877mod tests {
1878    use super::*;
1879
1880    #[test]
1881    fn dynamic_hub_implements_activation() {
1882        fn assert_activation<T: Activation>() {}
1883        assert_activation::<DynamicHub>();
1884    }
1885
1886    #[test]
1887    fn dynamic_hub_methods() {
1888        let hub = DynamicHub::new("test");
1889        let methods = hub.methods();
1890        assert!(methods.contains(&"call"));
1891        assert!(methods.contains(&"hash"));
1892        assert!(methods.contains(&"schema"));
1893        // list_activations was removed - use schema() instead
1894    }
1895
1896    #[test]
1897    fn dynamic_hub_hash_stable() {
1898        let h1 = DynamicHub::new("test");
1899        let h2 = DynamicHub::new("test");
1900        assert_eq!(h1.compute_hash(), h2.compute_hash());
1901    }
1902
1903    #[test]
1904    fn dynamic_hub_is_hub() {
1905        use crate::activations::health::Health;
1906        let hub = DynamicHub::new("test").register(Health::new());
1907        let schema = hub.plugin_schema();
1908
1909        // DynamicHub should be a hub (has children)
1910        assert!(schema.is_hub(), "dynamic hub should be a hub");
1911        assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1912
1913        // Should have children (as summaries)
1914        let children = schema.children.expect("dynamic hub should have children");
1915        assert!(!children.is_empty(), "dynamic hub should have at least one child");
1916
1917        // Health should be in the children summaries
1918        let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1919        assert!(!health.hash.is_empty(), "health should have a hash");
1920    }
1921
1922    #[test]
1923    fn dynamic_hub_schema_structure() {
1924        use crate::activations::health::Health;
1925        let hub = DynamicHub::new("test").register(Health::new());
1926        let schema = hub.plugin_schema();
1927
1928        // Pretty print the schema
1929        let json = serde_json::to_string_pretty(&schema).unwrap();
1930        println!("DynamicHub schema:\n{}", json);
1931
1932        // Verify structure
1933        assert_eq!(schema.namespace, "test");
1934        assert!(schema.methods.iter().any(|m| m.name == "call"));
1935        assert!(schema.children.is_some());
1936    }
1937
1938    // ========================================================================
1939    // INVARIANT: Handle routing - resolves to correct plugin
1940    // ========================================================================
1941
1942    #[tokio::test]
1943    async fn invariant_resolve_handle_unknown_activation() {
1944        use crate::activations::health::Health;
1945        use crate::types::Handle;
1946        use uuid::Uuid;
1947
1948        let hub = DynamicHub::new("test").register(Health::new());
1949
1950        // Handle for an unregistered activation (random UUID)
1951        let unknown_plugin_id = Uuid::new_v4();
1952        let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1953
1954        let result = hub.do_resolve_handle(&handle).await;
1955
1956        match result {
1957            Err(PlexusError::ActivationNotFound(_)) => {
1958                // Expected - activation not found
1959            }
1960            Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1961            Ok(_) => panic!("Expected error for unknown activation"),
1962        }
1963    }
1964
1965    #[tokio::test]
1966    async fn invariant_resolve_handle_unsupported() {
1967        use crate::activations::health::Health;
1968        use crate::types::Handle;
1969
1970        let hub = DynamicHub::new("test").register(Health::new());
1971
1972        // Handle for health activation (which doesn't support handle resolution)
1973        let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1974
1975        let result = hub.do_resolve_handle(&handle).await;
1976
1977        match result {
1978            Err(PlexusError::HandleNotSupported(name)) => {
1979                assert_eq!(name, "health");
1980            }
1981            Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1982            Ok(_) => panic!("Expected error for unsupported handle"),
1983        }
1984    }
1985
1986    #[tokio::test]
1987    async fn invariant_resolve_handle_routes_by_plugin_id() {
1988        use crate::activations::health::Health;
1989        use crate::activations::echo::Echo;
1990        use crate::types::Handle;
1991        use uuid::Uuid;
1992
1993        let health = Health::new();
1994        let echo = Echo::new();
1995        let health_plugin_id = health.plugin_id();
1996        let echo_plugin_id = echo.plugin_id();
1997
1998        let hub = DynamicHub::new("test")
1999            .register(health)
2000            .register(echo);
2001
2002        // Health handle → health activation
2003        let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
2004        match hub.do_resolve_handle(&health_handle).await {
2005            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
2006            Err(other) => panic!("health handle should route to health activation, got {:?}", other),
2007            Ok(_) => panic!("health handle should return HandleNotSupported"),
2008        }
2009
2010        // Echo handle → echo activation
2011        let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
2012        match hub.do_resolve_handle(&echo_handle).await {
2013            Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
2014            Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
2015            Ok(_) => panic!("echo handle should return HandleNotSupported"),
2016        }
2017
2018        // Unknown handle → ActivationNotFound (random UUID not registered)
2019        let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
2020        match hub.do_resolve_handle(&unknown_handle).await {
2021            Err(PlexusError::ActivationNotFound(_)) => { /* expected */ },
2022            Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
2023            Ok(_) => panic!("unknown handle should return ActivationNotFound"),
2024        }
2025    }
2026
2027    #[test]
2028    fn invariant_handle_plugin_id_determines_routing() {
2029        use crate::activations::health::Health;
2030        use crate::activations::echo::Echo;
2031        use crate::types::Handle;
2032
2033        let health = Health::new();
2034        let echo = Echo::new();
2035
2036        // Same meta, different activations → different routing targets (by plugin_id)
2037        let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
2038            .with_meta(vec!["msg-123".into(), "user".into()]);
2039        let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
2040            .with_meta(vec!["msg-123".into(), "user".into()]);
2041
2042        // Different plugin_ids ensure different routing
2043        assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
2044    }
2045
2046    // ========================================================================
2047    // Plugin Registry Tests
2048    // ========================================================================
2049
2050    #[test]
2051    fn plugin_registry_basic_operations() {
2052        let mut registry = PluginRegistry::new();
2053        let id = uuid::Uuid::new_v4();
2054
2055        // Register an activation
2056        registry.register(id, "test_plugin".to_string(), "test".to_string());
2057
2058        // Lookup by ID
2059        assert_eq!(registry.lookup(id), Some("test_plugin"));
2060
2061        // Lookup by path
2062        assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
2063
2064        // Get entry
2065        let entry = registry.get(id).expect("should have entry");
2066        assert_eq!(entry.path, "test_plugin");
2067        assert_eq!(entry.plugin_type, "test");
2068    }
2069
2070    #[test]
2071    fn plugin_registry_populated_on_register() {
2072        use crate::activations::health::Health;
2073
2074        let hub = DynamicHub::new("test").register(Health::new());
2075
2076        let registry = hub.registry();
2077        assert!(!registry.is_empty(), "registry should not be empty after registration");
2078
2079        // Health activation should be registered
2080        let health_id = registry.lookup_by_path("health");
2081        assert!(health_id.is_some(), "health should be registered by path");
2082
2083        // Should be able to look up path by ID
2084        let health_uuid = health_id.unwrap();
2085        assert_eq!(registry.lookup(health_uuid), Some("health"));
2086    }
2087
2088    #[test]
2089    fn plugin_registry_deterministic_uuid() {
2090        use crate::activations::health::Health;
2091
2092        // Same activation registered twice should produce same UUID
2093        let health1 = Health::new();
2094        let health2 = Health::new();
2095
2096        assert_eq!(health1.plugin_id(), health2.plugin_id(),
2097            "same activation type should have deterministic UUID");
2098
2099        // UUID should be based on namespace+major_version (semver compatibility)
2100        let expected = uuid::Uuid::new_v5(
2101            &uuid::Uuid::NAMESPACE_OID,
2102            b"health@1"
2103        );
2104        assert_eq!(health1.plugin_id(), expected,
2105            "plugin_id should be deterministic from namespace@major_version");
2106    }
2107
2108    // ========================================================================
2109    // CHILD-2: ChildRouter capabilities + opt-in list/search
2110    // ========================================================================
2111
2112    /// A minimal `ChildRouter` that overrides only the required methods.
2113    /// Exercises default implementations of `capabilities`, `list_children`
2114    /// and `search_children`.
2115    struct MinimalRouter;
2116
2117    #[async_trait]
2118    impl ChildRouter for MinimalRouter {
2119        fn router_namespace(&self) -> &str {
2120            "minimal"
2121        }
2122
2123        async fn router_call(
2124            &self,
2125            _method: &str,
2126            _params: Value,
2127            _auth: Option<&super::super::auth::AuthContext>,
2128            _raw_ctx: Option<&crate::request::RawRequestContext>,
2129        ) -> Result<PlexusStream, PlexusError> {
2130            Err(PlexusError::MethodNotFound {
2131                activation: "minimal".into(),
2132                method: "none".into(),
2133            })
2134        }
2135
2136        async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
2137            None
2138        }
2139    }
2140
2141    #[tokio::test]
2142    async fn child_router_defaults_report_no_capabilities_and_none_streams() {
2143        let router = MinimalRouter;
2144
2145        assert_eq!(
2146            router.capabilities(),
2147            ChildCapabilities::empty(),
2148            "default capabilities should be empty"
2149        );
2150        assert!(
2151            router.list_children().await.is_none(),
2152            "default list_children should be None"
2153        );
2154        assert!(
2155            router.search_children("anything").await.is_none(),
2156            "default search_children should be None"
2157        );
2158    }
2159
2160    /// A `ChildRouter` that opts in to both LIST and SEARCH.
2161    struct ListingRouter {
2162        names: Vec<String>,
2163    }
2164
2165    #[async_trait]
2166    impl ChildRouter for ListingRouter {
2167        fn router_namespace(&self) -> &str {
2168            "listing"
2169        }
2170
2171        async fn router_call(
2172            &self,
2173            _method: &str,
2174            _params: Value,
2175            _auth: Option<&super::super::auth::AuthContext>,
2176            _raw_ctx: Option<&crate::request::RawRequestContext>,
2177        ) -> Result<PlexusStream, PlexusError> {
2178            Err(PlexusError::MethodNotFound {
2179                activation: "listing".into(),
2180                method: "none".into(),
2181            })
2182        }
2183
2184        async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
2185            if self.names.iter().any(|n| n == name) {
2186                // Return the same type to keep the test simple; we only care
2187                // that the override compiles and is reachable.
2188                Some(Box::new(ListingRouter { names: vec![] }))
2189            } else {
2190                None
2191            }
2192        }
2193
2194        fn capabilities(&self) -> ChildCapabilities {
2195            ChildCapabilities::LIST | ChildCapabilities::SEARCH
2196        }
2197
2198        async fn list_children(&self) -> Option<BoxStream<'_, String>> {
2199            let stream = futures::stream::iter(self.names.iter().cloned());
2200            Some(Box::pin(stream))
2201        }
2202
2203        async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
2204            let q = query.to_string();
2205            let stream = futures::stream::iter(
2206                self.names
2207                    .iter()
2208                    .filter(move |n| n.contains(&q))
2209                    .cloned()
2210                    .collect::<Vec<_>>(),
2211            );
2212            Some(Box::pin(stream))
2213        }
2214    }
2215
2216    #[tokio::test]
2217    async fn child_router_overrides_report_capabilities_and_yield_streams() {
2218        use futures::StreamExt;
2219
2220        let router = ListingRouter {
2221            names: vec!["alpha".into(), "beta".into(), "alphabet".into()],
2222        };
2223
2224        // Capabilities
2225        let caps = router.capabilities();
2226        assert!(caps.contains(ChildCapabilities::LIST));
2227        assert!(caps.contains(ChildCapabilities::SEARCH));
2228        assert_eq!(caps, ChildCapabilities::LIST | ChildCapabilities::SEARCH);
2229
2230        // list_children yields the full, non-empty, finite sequence.
2231        let list_stream = router
2232            .list_children()
2233            .await
2234            .expect("LIST capability set — expected Some(stream)");
2235        let listed: Vec<String> = list_stream.collect().await;
2236        assert_eq!(listed, vec!["alpha".to_string(), "beta".into(), "alphabet".into()]);
2237
2238        // search_children filters by the query string.
2239        let search_stream = router
2240            .search_children("alpha")
2241            .await
2242            .expect("SEARCH capability set — expected Some(stream)");
2243        let matched: Vec<String> = search_stream.collect().await;
2244        assert_eq!(matched, vec!["alpha".to_string(), "alphabet".into()]);
2245    }
2246
2247    // ========================================================================
2248    // CHILD-WIRE: per-activation namespaced wire exposure for
2249    // <ns>.list_children / <ns>.search_children
2250    //
2251    // These tests exercise `register_child_capability_methods` directly with
2252    // hand-built fixtures, then drive the resulting RpcModule through the
2253    // in-process subscription path. Mirrors the existing
2254    // `auth_capabilities_info` integration pattern but verifies the
2255    // child-router wire registration instead of the _info payload.
2256    // ========================================================================
2257
2258    /// Like `EnumerableRouter` above but with configurable capability bits +
2259    /// a fixed name set. Used to drive CHILD-WIRE registration through
2260    /// different capability combinations.
2261    struct WireFixture {
2262        names: Vec<String>,
2263        caps: ChildCapabilities,
2264    }
2265
2266    #[async_trait]
2267    impl ChildRouter for WireFixture {
2268        fn router_namespace(&self) -> &str {
2269            "wirefixture"
2270        }
2271        async fn router_call(
2272            &self,
2273            _method: &str,
2274            _params: Value,
2275            _auth: Option<&super::super::auth::AuthContext>,
2276            _raw_ctx: Option<&crate::request::RawRequestContext>,
2277        ) -> Result<PlexusStream, PlexusError> {
2278            Err(PlexusError::MethodNotFound {
2279                activation: "wirefixture".into(),
2280                method: "none".into(),
2281            })
2282        }
2283        async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
2284            None
2285        }
2286        fn capabilities(&self) -> ChildCapabilities {
2287            self.caps
2288        }
2289        async fn list_children(&self) -> Option<futures_core::stream::BoxStream<'_, String>> {
2290            if !self.caps.contains(ChildCapabilities::LIST) {
2291                return None;
2292            }
2293            Some(Box::pin(futures::stream::iter(self.names.clone())))
2294        }
2295        async fn search_children(
2296            &self,
2297            query: &str,
2298        ) -> Option<futures_core::stream::BoxStream<'_, String>> {
2299            if !self.caps.contains(ChildCapabilities::SEARCH) {
2300                return None;
2301            }
2302            let q = query.to_lowercase();
2303            let filtered: Vec<String> = self
2304                .names
2305                .iter()
2306                .filter(|n| n.to_lowercase().contains(&q))
2307                .cloned()
2308                .collect();
2309            Some(Box::pin(futures::stream::iter(filtered)))
2310        }
2311    }
2312
2313    fn build_module_for(router: WireFixture, ns: &str) -> RpcModule<()> {
2314        let mut module = RpcModule::new(());
2315        let arc: Arc<dyn ChildRouter> = Arc::new(router);
2316        register_child_capability_methods(&mut module, ns, arc).expect("register");
2317        module
2318    }
2319
2320    #[tokio::test]
2321    async fn child_wire_registers_both_methods_when_both_bits_set() {
2322        let module = build_module_for(
2323            WireFixture {
2324                names: vec!["alpha".into(), "beta".into()],
2325                caps: ChildCapabilities::LIST | ChildCapabilities::SEARCH,
2326            },
2327            "fixture",
2328        );
2329        let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
2330        assert!(
2331            names.contains(&"fixture.list_children".to_string()),
2332            "expected fixture.list_children, got: {:?}",
2333            names
2334        );
2335        assert!(
2336            names.contains(&"fixture.search_children".to_string()),
2337            "expected fixture.search_children, got: {:?}",
2338            names
2339        );
2340    }
2341
2342    #[tokio::test]
2343    async fn child_wire_registers_nothing_when_no_bits_set() {
2344        let module = build_module_for(
2345            WireFixture {
2346                names: vec!["alpha".into()],
2347                caps: ChildCapabilities::empty(),
2348            },
2349            "fixture",
2350        );
2351        let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
2352        assert!(
2353            !names.contains(&"fixture.list_children".to_string()),
2354            "fixture.list_children should NOT be registered when cap absent"
2355        );
2356        assert!(
2357            !names.contains(&"fixture.search_children".to_string()),
2358            "fixture.search_children should NOT be registered when cap absent"
2359        );
2360    }
2361
2362    #[tokio::test]
2363    async fn child_wire_registers_only_list_when_only_list_bit() {
2364        let module = build_module_for(
2365            WireFixture {
2366                names: vec!["alpha".into()],
2367                caps: ChildCapabilities::LIST,
2368            },
2369            "fixture",
2370        );
2371        let names: Vec<String> = module.method_names().map(|s| s.to_string()).collect();
2372        assert!(names.contains(&"fixture.list_children".to_string()));
2373        assert!(!names.contains(&"fixture.search_children".to_string()));
2374    }
2375
2376    // Live wire-call behavior (subscription stream content, methodNotFound on
2377    // unregistered names, error envelopes) is verified end-to-end against
2378    // running substrate Solar — that's the canonical integration gate per
2379    // the CHILD-WIRE acceptance criteria. The unit-level introspection
2380    // tests above assert the registration shape; the substrate verification
2381    // asserts the live behavior. Splitting it that way avoids forcing the
2382    // unit test to construct a working RpcSubscriptionSink, which is not
2383    // straightforward in the bare jsonrpsee API.
2384}