1use super::{
9 context::PlexusContext,
10 method_enum::MethodEnumSchema,
11 schema::{ChildSummary, MethodSchema, PluginSchema, Schema},
12 streaming::PlexusStream,
13};
14use crate::types::Handle;
15use async_stream::stream;
16use async_trait::async_trait;
17use bitflags::bitflags;
18use futures::Stream;
19use futures_core::stream::BoxStream;
20use jsonrpsee::core::server::Methods;
21use jsonrpsee::RpcModule;
22
23pub const PLEXUS_NOTIF_METHOD: &str = "result";
29use schemars::JsonSchema;
30use serde::{Deserialize, Serialize};
31use serde_json::Value;
32use std::collections::HashMap;
33use std::sync::Arc;
34
35#[derive(Debug, Clone)]
40pub enum PlexusError {
41 ActivationNotFound(String),
42 MethodNotFound { activation: String, method: String },
43 InvalidParams(String),
44 ExecutionError(String),
45 HandleNotSupported(String),
46 TransportError(TransportErrorKind),
47 Unauthenticated(String),
48}
49
50#[derive(Debug, Clone, Serialize, Deserialize)]
51#[serde(tag = "error_kind", rename_all = "snake_case")]
52pub enum TransportErrorKind {
53 ConnectionRefused { host: String, port: u16 },
54 ConnectionTimeout { host: String, port: u16 },
55 ProtocolError { message: String },
56 NetworkError { message: String },
57}
58
59impl std::fmt::Display for TransportErrorKind {
60 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
61 match self {
62 TransportErrorKind::ConnectionRefused { host, port } => {
63 write!(f, "Connection refused to {}:{}", host, port)
64 }
65 TransportErrorKind::ConnectionTimeout { host, port } => {
66 write!(f, "Connection timeout to {}:{}", host, port)
67 }
68 TransportErrorKind::ProtocolError { message } => {
69 write!(f, "Protocol error: {}", message)
70 }
71 TransportErrorKind::NetworkError { message } => {
72 write!(f, "Network error: {}", message)
73 }
74 }
75 }
76}
77
78impl std::fmt::Display for PlexusError {
79 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
80 match self {
81 PlexusError::ActivationNotFound(name) => write!(f, "Activation not found: {}", name),
82 PlexusError::MethodNotFound { activation, method } => {
83 write!(f, "Method not found: {}.{}", activation, method)
84 }
85 PlexusError::InvalidParams(msg) => write!(f, "Invalid params: {}", msg),
86 PlexusError::ExecutionError(msg) => write!(f, "Execution error: {}", msg),
87 PlexusError::HandleNotSupported(activation) => {
88 write!(f, "Handle resolution not supported by activation: {}", activation)
89 }
90 PlexusError::TransportError(kind) => match kind {
91 TransportErrorKind::ConnectionRefused { host, port } => {
92 write!(f, "Connection refused to {}:{}", host, port)
93 }
94 TransportErrorKind::ConnectionTimeout { host, port } => {
95 write!(f, "Connection timeout to {}:{}", host, port)
96 }
97 TransportErrorKind::ProtocolError { message } => {
98 write!(f, "Protocol error: {}", message)
99 }
100 TransportErrorKind::NetworkError { message } => {
101 write!(f, "Network error: {}", message)
102 }
103 }
104 PlexusError::Unauthenticated(msg) => write!(f, "Authentication required: {}", msg),
105 }
106 }
107}
108
109impl std::error::Error for PlexusError {}
110
111fn plexus_error_code(e: &PlexusError) -> i32 {
120 match e {
121 PlexusError::Unauthenticated(_) => -32001,
122 PlexusError::InvalidParams(_) => -32602,
123 PlexusError::MethodNotFound { .. } | PlexusError::ActivationNotFound(_) => -32601,
124 _ => -32000,
125 }
126}
127
128fn plexus_error_to_jsonrpc(e: &PlexusError) -> jsonrpsee::types::ErrorObjectOwned {
130 jsonrpsee::types::ErrorObject::owned(plexus_error_code(e), e.to_string(), None::<()>)
131}
132
133#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
138pub struct ActivationInfo {
139 pub namespace: String,
140 pub version: String,
141 pub description: String,
142 pub methods: Vec<String>,
143}
144
145#[async_trait]
150pub trait Activation: Send + Sync + 'static {
151 type Methods: MethodEnumSchema;
152
153 fn namespace(&self) -> &str;
154 fn version(&self) -> &str;
155 fn description(&self) -> &str { "No description available" }
157 fn long_description(&self) -> Option<&str> { None }
159 fn methods(&self) -> Vec<&str>;
160 fn method_help(&self, _method: &str) -> Option<String> { None }
161 fn plugin_id(&self) -> uuid::Uuid {
165 let major_version = self.version().split('.').next().unwrap_or("0");
166 uuid::Uuid::new_v5(&uuid::Uuid::NAMESPACE_OID, format!("{}@{}", self.namespace(), major_version).as_bytes())
167 }
168
169 async fn call(
170 &self,
171 method: &str,
172 params: Value,
173 auth: Option<&super::auth::AuthContext>,
174 raw_ctx: Option<&crate::request::RawRequestContext>,
175 ) -> Result<PlexusStream, PlexusError>;
176 async fn resolve_handle(&self, _handle: &Handle) -> Result<PlexusStream, PlexusError> {
177 Err(PlexusError::HandleNotSupported(self.namespace().to_string()))
178 }
179
180 fn into_rpc_methods(self) -> Methods where Self: Sized;
181
182 fn plugin_schema(&self) -> PluginSchema {
184 use std::collections::hash_map::DefaultHasher;
185 use std::hash::{Hash, Hasher};
186
187 let methods: Vec<MethodSchema> = self.methods().iter().map(|name| {
188 let desc = self.method_help(name).unwrap_or_default();
189 let mut hasher = DefaultHasher::new();
191 name.hash(&mut hasher);
192 desc.hash(&mut hasher);
193 let hash = format!("{:016x}", hasher.finish());
194 MethodSchema::new(name.to_string(), desc, hash)
195 }).collect();
196
197 if let Some(long_desc) = self.long_description() {
198 PluginSchema::leaf_with_long_description(
199 self.namespace(),
200 self.version(),
201 self.description(),
202 long_desc,
203 methods,
204 )
205 } else {
206 PluginSchema::leaf(
207 self.namespace(),
208 self.version(),
209 self.description(),
210 methods,
211 )
212 }
213 }
214}
215
216bitflags! {
221 #[derive(Copy, Clone, Debug, Eq, PartialEq, Hash)]
250 #[deprecated(
251 since = "0.5",
252 note = "Use MethodRole::DynamicChild { list_method, search_method } instead. Removed in 0.7."
253 )]
254 pub struct ChildCapabilities: u32 {
255 const LIST = 0b0000_0001;
257 const SEARCH = 0b0000_0010;
260 }
261}
262
263#[async_trait]
280pub trait ChildRouter: Send + Sync {
281 fn router_namespace(&self) -> &str;
283
284 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
286
287 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>>;
289
290 #[allow(deprecated)]
295 fn capabilities(&self) -> ChildCapabilities {
296 ChildCapabilities::empty()
297 }
298
299 async fn list_children(&self) -> Option<BoxStream<'_, String>> {
307 None
308 }
309
310 async fn search_children(&self, _query: &str) -> Option<BoxStream<'_, String>> {
318 None
319 }
320}
321
322pub async fn route_to_child<T: ChildRouter + ?Sized>(
328 parent: &T,
329 method: &str,
330 params: Value,
331 auth: Option<&super::auth::AuthContext>,
332 raw_ctx: Option<&crate::request::RawRequestContext>,
333) -> Result<PlexusStream, PlexusError> {
334 if let Some((child_name, rest)) = method.split_once('.') {
336 if let Some(child) = parent.get_child(child_name).await {
337 return child.router_call(rest, params, auth, raw_ctx).await;
338 }
339 return Err(PlexusError::ActivationNotFound(child_name.to_string()));
340 }
341
342 Err(PlexusError::MethodNotFound {
344 activation: parent.router_namespace().to_string(),
345 method: method.to_string(),
346 })
347}
348
349struct ArcChildRouter(Arc<dyn ChildRouter>);
353
354#[async_trait]
355impl ChildRouter for ArcChildRouter {
356 fn router_namespace(&self) -> &str {
357 self.0.router_namespace()
358 }
359
360 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
361 self.0.router_call(method, params, auth, raw_ctx).await
362 }
363
364 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
365 self.0.get_child(name).await
366 }
367
368 #[allow(deprecated)]
369 fn capabilities(&self) -> ChildCapabilities {
370 self.0.capabilities()
371 }
372
373 async fn list_children(&self) -> Option<BoxStream<'_, String>> {
374 self.0.list_children().await
375 }
376
377 async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
378 self.0.search_children(query).await
379 }
380}
381
382#[async_trait]
387#[allow(dead_code)] trait ActivationObject: Send + Sync + 'static {
389 fn namespace(&self) -> &str;
390 fn version(&self) -> &str;
391 fn description(&self) -> &str;
392 fn long_description(&self) -> Option<&str>;
393 fn methods(&self) -> Vec<&str>;
394 fn method_help(&self, method: &str) -> Option<String>;
395 fn plugin_id(&self) -> uuid::Uuid;
396 async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError>;
397 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError>;
398 fn plugin_schema(&self) -> PluginSchema;
399 fn schema(&self) -> Schema;
400}
401
402struct ActivationWrapper<A: Activation> {
403 inner: A,
404}
405
406#[async_trait]
407impl<A: Activation> ActivationObject for ActivationWrapper<A> {
408 fn namespace(&self) -> &str { self.inner.namespace() }
409 fn version(&self) -> &str { self.inner.version() }
410 fn description(&self) -> &str { self.inner.description() }
411 fn long_description(&self) -> Option<&str> { self.inner.long_description() }
412 fn methods(&self) -> Vec<&str> { self.inner.methods() }
413 fn method_help(&self, method: &str) -> Option<String> { self.inner.method_help(method) }
414 fn plugin_id(&self) -> uuid::Uuid { self.inner.plugin_id() }
415
416 async fn call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
417 self.inner.call(method, params, auth, raw_ctx).await
418 }
419
420 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
421 self.inner.resolve_handle(handle).await
422 }
423
424 fn plugin_schema(&self) -> PluginSchema { self.inner.plugin_schema() }
425
426 fn schema(&self) -> Schema {
427 let schema = schemars::schema_for!(A::Methods);
428 serde_json::from_value(serde_json::to_value(schema).expect("serialize"))
429 .expect("parse schema")
430 }
431}
432
433#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
438#[serde(tag = "event", rename_all = "snake_case")]
439pub enum HashEvent {
440 Hash { value: String },
441}
442
443#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
445#[serde(tag = "event", rename_all = "snake_case")]
446pub enum SchemaEvent {
447 Schema(PluginSchema),
449}
450
451#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
453pub struct PluginHashes {
454 pub namespace: String,
455 pub self_hash: String,
456 #[serde(skip_serializing_if = "Option::is_none")]
457 pub children_hash: Option<String>,
458 pub hash: String,
459 #[serde(skip_serializing_if = "Option::is_none")]
461 pub children: Option<Vec<ChildHashes>>,
462}
463
464#[derive(Debug, Clone, Serialize, Deserialize, JsonSchema)]
466pub struct ChildHashes {
467 pub namespace: String,
468 pub hash: String,
469}
470
471
472#[derive(Debug, Clone)]
478pub struct PluginEntry {
479 pub id: uuid::Uuid,
481 pub path: String,
483 pub plugin_type: String,
485}
486
487#[derive(Default)]
492pub struct PluginRegistry {
493 by_id: HashMap<uuid::Uuid, PluginEntry>,
495 by_path: HashMap<String, uuid::Uuid>,
497}
498
499#[derive(Clone)]
503pub struct PluginRegistrySnapshot {
504 by_id: HashMap<uuid::Uuid, PluginEntry>,
505 by_path: HashMap<String, uuid::Uuid>,
506}
507
508impl PluginRegistrySnapshot {
509 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
511 self.by_id.get(&id).map(|e| e.path.as_str())
512 }
513
514 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
516 self.by_path.get(path).copied()
517 }
518
519 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
521 self.by_id.get(&id)
522 }
523
524 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
526 self.by_id.values()
527 }
528
529 pub fn len(&self) -> usize {
531 self.by_id.len()
532 }
533
534 pub fn is_empty(&self) -> bool {
536 self.by_id.is_empty()
537 }
538}
539
540impl PluginRegistry {
541 pub fn new() -> Self {
543 Self::default()
544 }
545
546 pub fn lookup(&self, id: uuid::Uuid) -> Option<&str> {
548 self.by_id.get(&id).map(|e| e.path.as_str())
549 }
550
551 pub fn lookup_by_path(&self, path: &str) -> Option<uuid::Uuid> {
553 self.by_path.get(path).copied()
554 }
555
556 pub fn get(&self, id: uuid::Uuid) -> Option<&PluginEntry> {
558 self.by_id.get(&id)
559 }
560
561 pub fn register(&mut self, id: uuid::Uuid, path: String, plugin_type: String) {
563 let entry = PluginEntry { id, path: path.clone(), plugin_type };
564 self.by_id.insert(id, entry);
565 self.by_path.insert(path, id);
566 }
567
568 pub fn list(&self) -> impl Iterator<Item = &PluginEntry> {
570 self.by_id.values()
571 }
572
573 pub fn len(&self) -> usize {
575 self.by_id.len()
576 }
577
578 pub fn is_empty(&self) -> bool {
580 self.by_id.is_empty()
581 }
582}
583
584struct DynamicHubInner {
589 namespace: String,
591 activations: HashMap<String, Arc<dyn ActivationObject>>,
592 child_routers: HashMap<String, Arc<dyn ChildRouter>>,
594 registry: std::sync::RwLock<PluginRegistry>,
596 pending_rpc: std::sync::Mutex<Vec<Box<dyn FnOnce() -> Methods + Send>>>,
597}
598
599#[derive(Clone)]
621pub struct DynamicHub {
622 inner: Arc<DynamicHubInner>,
623}
624
625impl DynamicHub {
630 pub fn new(namespace: impl Into<String>) -> Self {
640 Self {
641 inner: Arc::new(DynamicHubInner {
642 namespace: namespace.into(),
643 activations: HashMap::new(),
644 child_routers: HashMap::new(),
645 registry: std::sync::RwLock::new(PluginRegistry::new()),
646 pending_rpc: std::sync::Mutex::new(Vec::new()),
647 }),
648 }
649 }
650
651 #[deprecated(since = "0.3.0", note = "Use DynamicHub::new(namespace) instead")]
653 pub fn with_namespace(namespace: impl Into<String>) -> Self {
654 Self::new(namespace)
655 }
656
657 pub fn runtime_namespace(&self) -> &str {
659 &self.inner.namespace
660 }
661
662 pub fn registry(&self) -> std::sync::RwLockReadGuard<'_, PluginRegistry> {
664 self.inner.registry.read().unwrap()
665 }
666
667 pub fn register<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
669 let namespace = activation.namespace().to_string();
670 let plugin_id = activation.plugin_id();
671 let activation_for_rpc = activation.clone();
672 let activation_for_router = activation.clone();
673
674 let inner = Arc::get_mut(&mut self.inner)
675 .expect("Cannot register: DynamicHub has multiple references");
676
677 inner.registry.write().unwrap().register(
679 plugin_id,
680 namespace.clone(),
681 namespace.clone(), );
683
684 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
685 inner.child_routers.insert(namespace.clone(), Arc::new(activation_for_router));
686 inner.pending_rpc.lock().unwrap()
687 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
688 self
689 }
690
691 #[deprecated(since = "0.5.0", note = "Use register() — it now handles both leaf and hub activations")]
696 pub fn register_hub<A: Activation + ChildRouter + Clone + 'static>(mut self, activation: A) -> Self {
697 let namespace = activation.namespace().to_string();
698 let plugin_id = activation.plugin_id();
699 let activation_for_rpc = activation.clone();
700 let activation_for_router = activation.clone();
701
702 let inner = Arc::get_mut(&mut self.inner)
703 .expect("Cannot register: DynamicHub has multiple references");
704
705 inner.registry.write().unwrap().register(
707 plugin_id,
708 namespace.clone(),
709 namespace.clone(), );
711
712 inner.activations.insert(namespace.clone(), Arc::new(ActivationWrapper { inner: activation }));
713 inner.child_routers.insert(namespace, Arc::new(activation_for_router));
714 inner.pending_rpc.lock().unwrap()
715 .push(Box::new(move || activation_for_rpc.into_rpc_methods()));
716 self
717 }
718
719 pub fn list_methods(&self) -> Vec<String> {
721 let mut methods = Vec::new();
722
723 for m in Activation::methods(self) {
725 methods.push(format!("{}.{}", self.inner.namespace, m));
726 }
727
728 for (ns, act) in &self.inner.activations {
730 for m in act.methods() {
731 methods.push(format!("{}.{}", ns, m));
732 }
733 }
734 methods.sort();
735 methods
736 }
737
738 pub fn list_activations_info(&self) -> Vec<ActivationInfo> {
740 let mut activations = Vec::new();
741
742 activations.push(ActivationInfo {
744 namespace: Activation::namespace(self).to_string(),
745 version: Activation::version(self).to_string(),
746 description: Activation::description(self).to_string(),
747 methods: Activation::methods(self).iter().map(|s| s.to_string()).collect(),
748 });
749
750 for a in self.inner.activations.values() {
752 activations.push(ActivationInfo {
753 namespace: a.namespace().to_string(),
754 version: a.version().to_string(),
755 description: a.description().to_string(),
756 methods: a.methods().iter().map(|s| s.to_string()).collect(),
757 });
758 }
759
760 activations
761 }
762
763 pub fn compute_hash(&self) -> String {
768 Activation::plugin_schema(self).hash
769 }
770
771 pub async fn route(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>) -> Result<PlexusStream, PlexusError> {
773 self.route_with_ctx(method, params, auth, None).await
774 }
775
776 pub async fn route_with_ctx(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
778 let (namespace, method_name) = self.parse_method(method)?;
779
780 if namespace == self.inner.namespace {
782 return Activation::call(self, method_name, params, auth, raw_ctx).await;
783 }
784
785 let activation = self.inner.activations.get(namespace)
786 .ok_or_else(|| PlexusError::ActivationNotFound(namespace.to_string()))?;
787
788 activation.call(method_name, params, auth, raw_ctx).await
789 }
790
791 pub async fn do_resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
795 let path = self.inner.registry.read().unwrap()
796 .lookup(handle.plugin_id)
797 .map(|s| s.to_string())
798 .ok_or_else(|| PlexusError::ActivationNotFound(handle.plugin_id.to_string()))?;
799
800 let activation = self.inner.activations.get(&path)
801 .ok_or_else(|| PlexusError::ActivationNotFound(path.clone()))?;
802 activation.resolve_handle(handle).await
803 }
804
805 pub fn get_activation_schema(&self, namespace: &str) -> Option<Schema> {
807 self.inner.activations.get(namespace).map(|a| a.schema())
808 }
809
810 pub fn registry_snapshot(&self) -> PluginRegistrySnapshot {
812 let guard = self.inner.registry.read().unwrap();
813 PluginRegistrySnapshot {
814 by_id: guard.by_id.clone(),
815 by_path: guard.by_path.clone(),
816 }
817 }
818
819 pub fn lookup_plugin(&self, id: uuid::Uuid) -> Option<String> {
821 self.inner.registry.read().unwrap().lookup(id).map(|s| s.to_string())
822 }
823
824 pub fn lookup_plugin_by_path(&self, path: &str) -> Option<uuid::Uuid> {
826 self.inner.registry.read().unwrap().lookup_by_path(path)
827 }
828
829 pub fn list_plugin_schemas(&self) -> Vec<PluginSchema> {
831 let mut schemas = Vec::new();
832
833 schemas.push(Activation::plugin_schema(self));
835
836 for a in self.inner.activations.values() {
838 schemas.push(a.plugin_schema());
839 }
840
841 schemas
842 }
843
844 #[deprecated(note = "Use list_plugin_schemas instead")]
846 pub fn list_full_schemas(&self) -> Vec<PluginSchema> {
847 self.list_plugin_schemas()
848 }
849
850 pub fn get_method_help(&self, method: &str) -> Option<String> {
852 let (namespace, method_name) = self.parse_method(method).ok()?;
853 let activation = self.inner.activations.get(namespace)?;
854 activation.method_help(method_name)
855 }
856
857 fn parse_method<'a>(&self, method: &'a str) -> Result<(&'a str, &'a str), PlexusError> {
858 let parts: Vec<&str> = method.splitn(2, '.').collect();
859 if parts.len() != 2 {
860 return Err(PlexusError::InvalidParams(format!("Invalid method format: {}", method)));
861 }
862 Ok((parts[0], parts[1]))
863 }
864
865 pub fn plugin_children(&self) -> Vec<ChildSummary> {
868 self.inner.activations.values()
869 .map(|a| {
870 let schema = a.plugin_schema();
871 ChildSummary {
872 namespace: schema.namespace,
873 description: schema.description,
874 hash: schema.hash,
875 }
876 })
877 .collect()
878 }
879
880 pub fn into_rpc_module(self) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
882 let mut module = RpcModule::new(());
883
884 PlexusContext::init(self.compute_hash());
885
886 let ns = self.runtime_namespace();
889 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
890 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
891 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
892 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
893 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
894 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
895 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
896 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
897 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
898
899 let plexus_for_call = self.clone();
901 module.register_subscription(
902 call_method,
903 PLEXUS_NOTIF_METHOD,
904 call_unsub,
905 move |params, pending, _ctx, _ext| {
906 let plexus = plexus_for_call.clone();
907 Box::pin(async move {
908 let p: CallParams = params.parse()?;
909 match plexus.route(&p.method, p.params.unwrap_or_default(), None).await {
910 Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
911 Err(e) => {
912 let sink = pending.accept().await?;
913 let error_item = super::types::PlexusStreamItem::Error {
914 metadata: super::types::StreamMetadata::new(
915 vec![ns_static.into()],
916 PlexusContext::hash(),
917 ),
918 message: e.to_string(),
919 code: Some(plexus_error_code(&e).to_string()),
920 recoverable: false,
921 };
922 if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
923 let _ = sink.send(raw).await;
924 }
925 Ok(())
926 }
927 }
928 })
929 }
930 )?;
931
932 let plexus_for_hash = self.clone();
934 module.register_subscription(
935 hash_method,
936 PLEXUS_NOTIF_METHOD,
937 hash_unsub,
938 move |_params, pending, _ctx, _ext| {
939 let plexus = plexus_for_hash.clone();
940 Box::pin(async move {
941 let schema = Activation::plugin_schema(&plexus);
942 let stream = async_stream::stream! {
943 yield HashEvent::Hash { value: schema.hash };
944 };
945 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
946 pipe_stream_to_subscription(pending, wrapped).await
947 })
948 }
949 )?;
950
951 let plexus_for_schema = self.clone();
953 module.register_subscription(
954 schema_method,
955 PLEXUS_NOTIF_METHOD,
956 schema_unsub,
957 move |params, pending, _ctx, _ext| {
958 let plexus = plexus_for_schema.clone();
959 Box::pin(async move {
960 let p: SchemaParams = params.parse().unwrap_or_default();
961 let plugin_schema = Activation::plugin_schema(&plexus);
962
963 let result = if let Some(ref name) = p.method {
964 plugin_schema.methods.iter()
965 .find(|m| m.name == *name)
966 .map(|m| super::SchemaResult::Method(m.clone()))
967 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
968 -32602,
969 format!("Method '{}' not found", name),
970 None::<()>,
971 ))?
972 } else {
973 super::SchemaResult::Plugin(plugin_schema)
974 };
975
976 let stream = async_stream::stream! { yield result; };
977 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
978 pipe_stream_to_subscription(pending, wrapped).await
979 })
980 }
981 )?;
982
983 let backend_name = self.runtime_namespace().to_string();
986 module.register_subscription(
987 "_info",
988 PLEXUS_NOTIF_METHOD,
989 "_info_unsub",
990 move |_params, pending, _ctx, _ext| {
991 let name = backend_name.clone();
992 Box::pin(async move {
993 let info_stream = futures::stream::once(async move {
995 serde_json::json!({"backend": name})
996 });
997
998 let wrapped = super::streaming::wrap_stream(
1000 info_stream,
1001 "_info",
1002 vec![]
1003 );
1004
1005 pipe_stream_to_subscription(pending, wrapped).await
1007 })
1008 }
1009 )?;
1010
1011 let pending = std::mem::take(&mut *self.inner.pending_rpc.lock().unwrap());
1013 for factory in pending {
1014 module.merge(factory())?;
1015 }
1016
1017 Ok(module)
1018 }
1019
1020 pub fn arc_into_rpc_module(hub: Arc<Self>) -> Result<RpcModule<()>, jsonrpsee::core::RegisterMethodError> {
1026 let mut module = RpcModule::new(());
1027
1028 PlexusContext::init(hub.compute_hash());
1029
1030 let ns = hub.runtime_namespace();
1033 let call_method: &'static str = Box::leak(format!("{}.call", ns).into_boxed_str());
1034 let call_unsub: &'static str = Box::leak(format!("{}.call_unsub", ns).into_boxed_str());
1035 let hash_method: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1036 let hash_unsub: &'static str = Box::leak(format!("{}.hash_unsub", ns).into_boxed_str());
1037 let schema_method: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1038 let schema_unsub: &'static str = Box::leak(format!("{}.schema_unsub", ns).into_boxed_str());
1039 let hash_content_type: &'static str = Box::leak(format!("{}.hash", ns).into_boxed_str());
1040 let schema_content_type: &'static str = Box::leak(format!("{}.schema", ns).into_boxed_str());
1041 let ns_static: &'static str = Box::leak(ns.to_string().into_boxed_str());
1042
1043 let hub_for_call = hub.clone();
1045 module.register_subscription(
1046 call_method,
1047 call_method,
1048 call_unsub,
1049 move |params, pending, _ctx, ext| {
1050 let hub = hub_for_call.clone();
1051 Box::pin(async move {
1052 let p: CallParams = params.parse()?;
1053 let auth = ext.get::<std::sync::Arc<super::auth::AuthContext>>()
1055 .map(|arc| arc.as_ref());
1056 match hub.route(&p.method, p.params.unwrap_or_default(), auth).await {
1057 Ok(stream) => pipe_stream_to_subscription(pending, stream).await,
1058 Err(e) => {
1059 let sink = pending.accept().await?;
1064 let error_item = super::types::PlexusStreamItem::Error {
1065 metadata: super::types::StreamMetadata::new(
1066 vec![ns_static.into()],
1067 PlexusContext::hash(),
1068 ),
1069 message: e.to_string(),
1070 code: Some(plexus_error_code(&e).to_string()),
1071 recoverable: false,
1072 };
1073 if let Ok(raw) = serde_json::value::to_raw_value(&error_item) {
1074 let _ = sink.send(raw).await;
1075 }
1076 Ok(())
1077 }
1078 }
1079 })
1080 }
1081 )?;
1082
1083 let hub_for_hash = hub.clone();
1085 module.register_subscription(
1086 hash_method,
1087 PLEXUS_NOTIF_METHOD,
1088 hash_unsub,
1089 move |_params, pending, _ctx, _ext| {
1090 let hub = hub_for_hash.clone();
1091 Box::pin(async move {
1092 let schema = Activation::plugin_schema(&*hub);
1093 let stream = async_stream::stream! {
1094 yield HashEvent::Hash { value: schema.hash };
1095 };
1096 let wrapped = super::streaming::wrap_stream(stream, hash_content_type, vec![ns_static.into()]);
1097 pipe_stream_to_subscription(pending, wrapped).await
1098 })
1099 }
1100 )?;
1101
1102 let hub_for_schema = hub.clone();
1104 module.register_subscription(
1105 schema_method,
1106 PLEXUS_NOTIF_METHOD,
1107 schema_unsub,
1108 move |params, pending, _ctx, _ext| {
1109 let hub = hub_for_schema.clone();
1110 Box::pin(async move {
1111 let p: SchemaParams = params.parse().unwrap_or_default();
1112 let plugin_schema = Activation::plugin_schema(&*hub);
1113
1114 let result = if let Some(ref name) = p.method {
1115 plugin_schema.methods.iter()
1116 .find(|m| m.name == *name)
1117 .map(|m| super::SchemaResult::Method(m.clone()))
1118 .ok_or_else(|| jsonrpsee::types::ErrorObject::owned(
1119 -32602,
1120 format!("Method '{}' not found", name),
1121 None::<()>,
1122 ))?
1123 } else {
1124 super::SchemaResult::Plugin(plugin_schema)
1125 };
1126
1127 let stream = async_stream::stream! {
1128 yield result;
1129 };
1130 let wrapped = super::streaming::wrap_stream(stream, schema_content_type, vec![ns_static.into()]);
1131 pipe_stream_to_subscription(pending, wrapped).await
1132 })
1133 }
1134 )?;
1135
1136 let backend_name = hub.runtime_namespace().to_string();
1139 module.register_subscription(
1140 "_info",
1141 PLEXUS_NOTIF_METHOD,
1142 "_info_unsub",
1143 move |_params, pending, _ctx, _ext| {
1144 let name = backend_name.clone();
1145 Box::pin(async move {
1146 let info_stream = futures::stream::once(async move {
1148 serde_json::json!({"backend": name})
1149 });
1150
1151 let wrapped = super::streaming::wrap_stream(
1153 info_stream,
1154 "_info",
1155 vec![]
1156 );
1157
1158 pipe_stream_to_subscription(pending, wrapped).await
1160 })
1161 }
1162 )?;
1163
1164 let respond_method: &'static str = Box::leak(format!("{}.respond", ns).into_boxed_str());
1167 module.register_async_method(respond_method, |params, _ctx, _ext| async move {
1168 use super::bidirectional::{handle_pending_response, BidirError};
1169
1170 let p: RespondParams = params.parse()?;
1171
1172 tracing::debug!(
1173 request_id = %p.request_id,
1174 "Handling {}.respond via WebSocket",
1175 "plexus"
1176 );
1177
1178 match handle_pending_response(&p.request_id, p.response_data) {
1179 Ok(()) => Ok(serde_json::json!({"success": true})),
1180 Err(BidirError::UnknownRequest) => {
1181 tracing::warn!(request_id = %p.request_id, "Unknown request ID in respond");
1182 Err(jsonrpsee::types::ErrorObject::owned(
1183 -32602,
1184 format!("Unknown request ID: {}. The request may have timed out or been cancelled.", p.request_id),
1185 None::<()>,
1186 ))
1187 }
1188 Err(BidirError::ChannelClosed) => {
1189 tracing::warn!(request_id = %p.request_id, "Channel closed in respond");
1190 Err(jsonrpsee::types::ErrorObject::owned(
1191 -32000,
1192 "Response channel was closed (request may have timed out)",
1193 None::<()>,
1194 ))
1195 }
1196 Err(e) => {
1197 tracing::error!(request_id = %p.request_id, error = ?e, "Error in respond");
1198 Err(jsonrpsee::types::ErrorObject::owned(
1199 -32000,
1200 format!("Failed to deliver response: {}", e),
1201 None::<()>,
1202 ))
1203 }
1204 }
1205 })?;
1206
1207 let pending = std::mem::take(&mut *hub.inner.pending_rpc.lock().unwrap());
1209 tracing::trace!(factories = pending.len(), "merging activation RPC factories");
1210 for (idx, factory) in pending.into_iter().enumerate() {
1211 tracing::trace!(factory_idx = idx, "calling factory to get Methods");
1212 let methods = factory();
1213 let method_count = methods.method_names().count();
1214 tracing::trace!(factory_idx = idx, methods = method_count, "factory returned Methods; merging into module");
1215 module.merge(methods)?;
1216 tracing::trace!(factory_idx = idx, "successfully merged factory methods");
1217 }
1218 tracing::trace!("all activations merged successfully");
1219
1220 Ok(module)
1221 }
1222}
1223
1224#[derive(Debug, serde::Deserialize)]
1226struct CallParams {
1227 method: String,
1228 #[serde(default)]
1229 params: Option<Value>,
1230}
1231
1232#[derive(Debug, Default, serde::Deserialize)]
1234struct SchemaParams {
1235 method: Option<String>,
1236}
1237
1238#[derive(Debug, serde::Deserialize)]
1240struct RespondParams {
1241 request_id: String,
1242 response_data: Value,
1243}
1244
1245async fn pipe_stream_to_subscription(
1250 pending: jsonrpsee::PendingSubscriptionSink,
1251 mut stream: PlexusStream,
1252) -> jsonrpsee::core::SubscriptionResult {
1253 use futures::StreamExt;
1254
1255 let sink = pending.accept().await?;
1256 while let Some(item) = stream.next().await {
1257 let json = serde_json::value::to_raw_value(&item)?;
1258 sink.send(json).await?;
1259 }
1260 Ok(())
1261}
1262
1263#[plexus_macros::activation(
1268 namespace = "plexus",
1269 version = "1.0.0",
1270 description = "Central routing and introspection",
1271 hub,
1272 namespace_fn = "runtime_namespace"
1273)]
1274#[allow(deprecated)]
1275impl DynamicHub {
1276 #[plexus_macros::method(
1278 streaming,
1279 description = "Route a call to a registered activation",
1280 params(
1281 method = "The method to call (format: namespace.method)",
1282 params = "Parameters to pass to the method (optional, defaults to {})"
1283 )
1284 )]
1285 async fn call(
1286 &self,
1287 method: String,
1288 params: Option<Value>,
1289 ) -> impl Stream<Item = super::types::PlexusStreamItem> + Send + 'static {
1290 use super::context::PlexusContext;
1291 use super::types::{PlexusStreamItem, StreamMetadata};
1292
1293 let result = self.route(&method, params.unwrap_or_default(), None).await;
1294
1295 match result {
1296 Ok(plexus_stream) => {
1297 plexus_stream
1299 }
1300 Err(e) => {
1301 let metadata = StreamMetadata::new(
1303 vec![self.inner.namespace.clone()],
1304 PlexusContext::hash(),
1305 );
1306 Box::pin(futures::stream::once(async move {
1307 PlexusStreamItem::Error {
1308 metadata,
1309 message: e.to_string(),
1310 code: None,
1311 recoverable: false,
1312 }
1313 }))
1314 }
1315 }
1316 }
1317
1318 #[plexus_macros::method(description = "Get plexus configuration hash (from the recursive schema)\n\n This hash changes whenever any method or child plugin changes.\n It's computed from the method hashes rolled up through the schema tree.")]
1323 async fn hash(&self) -> impl Stream<Item = HashEvent> + Send + 'static {
1324 let schema = Activation::plugin_schema(self);
1325 stream! { yield HashEvent::Hash { value: schema.hash }; }
1326 }
1327
1328 #[plexus_macros::method(description = "Get plugin hashes for cache validation")]
1330 #[allow(deprecated)]
1331 async fn hashes(&self) -> impl Stream<Item = PluginHashes> + Send + 'static {
1332 let schema = Activation::plugin_schema(self);
1333
1334 stream! {
1335 yield PluginHashes {
1336 namespace: schema.namespace.clone(),
1337 self_hash: schema.self_hash.clone(),
1338 children_hash: schema.children_hash.clone(),
1339 hash: schema.hash.clone(),
1340 children: schema.children.as_ref().map(|kids| {
1341 kids.iter()
1342 .map(|c| ChildHashes {
1343 namespace: c.namespace.clone(),
1344 hash: c.hash.clone(),
1345 })
1346 .collect()
1347 }),
1348 };
1349 }
1350 }
1351
1352 }
1354
1355use super::hub_context::HubContext;
1360use std::sync::Weak;
1361
1362#[async_trait]
1368impl HubContext for Weak<DynamicHub> {
1369 async fn resolve_handle(&self, handle: &Handle) -> Result<PlexusStream, PlexusError> {
1370 let hub = self.upgrade().ok_or_else(|| {
1371 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1372 })?;
1373 hub.do_resolve_handle(handle).await
1374 }
1375
1376 async fn call(&self, method: &str, params: serde_json::Value) -> Result<PlexusStream, PlexusError> {
1377 let hub = self.upgrade().ok_or_else(|| {
1378 PlexusError::ExecutionError("Parent hub has been dropped".to_string())
1379 })?;
1380 hub.route(method, params, None).await
1381 }
1382
1383 fn is_valid(&self) -> bool {
1384 self.upgrade().is_some()
1385 }
1386}
1387
1388#[async_trait]
1393impl ChildRouter for DynamicHub {
1394 fn router_namespace(&self) -> &str {
1395 &self.inner.namespace
1396 }
1397
1398 async fn router_call(&self, method: &str, params: Value, auth: Option<&super::auth::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
1399 self.route_with_ctx(method, params, auth, raw_ctx).await
1402 }
1403
1404 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1405 self.inner.child_routers.get(name)
1407 .map(|router| {
1408 Box::new(ArcChildRouter(router.clone())) as Box<dyn ChildRouter>
1410 })
1411 }
1412}
1413
1414#[cfg(test)]
1415#[allow(deprecated)]
1416mod tests {
1417 use super::*;
1418
1419 #[test]
1420 fn dynamic_hub_implements_activation() {
1421 fn assert_activation<T: Activation>() {}
1422 assert_activation::<DynamicHub>();
1423 }
1424
1425 #[test]
1426 fn dynamic_hub_methods() {
1427 let hub = DynamicHub::new("test");
1428 let methods = hub.methods();
1429 assert!(methods.contains(&"call"));
1430 assert!(methods.contains(&"hash"));
1431 assert!(methods.contains(&"schema"));
1432 }
1434
1435 #[test]
1436 fn dynamic_hub_hash_stable() {
1437 let h1 = DynamicHub::new("test");
1438 let h2 = DynamicHub::new("test");
1439 assert_eq!(h1.compute_hash(), h2.compute_hash());
1440 }
1441
1442 #[test]
1443 fn dynamic_hub_is_hub() {
1444 use crate::activations::health::Health;
1445 let hub = DynamicHub::new("test").register(Health::new());
1446 let schema = hub.plugin_schema();
1447
1448 assert!(schema.is_hub(), "dynamic hub should be a hub");
1450 assert!(!schema.is_leaf(), "dynamic hub should not be a leaf");
1451
1452 let children = schema.children.expect("dynamic hub should have children");
1454 assert!(!children.is_empty(), "dynamic hub should have at least one child");
1455
1456 let health = children.iter().find(|c| c.namespace == "health").expect("should have health child");
1458 assert!(!health.hash.is_empty(), "health should have a hash");
1459 }
1460
1461 #[test]
1462 fn dynamic_hub_schema_structure() {
1463 use crate::activations::health::Health;
1464 let hub = DynamicHub::new("test").register(Health::new());
1465 let schema = hub.plugin_schema();
1466
1467 let json = serde_json::to_string_pretty(&schema).unwrap();
1469 println!("DynamicHub schema:\n{}", json);
1470
1471 assert_eq!(schema.namespace, "test");
1473 assert!(schema.methods.iter().any(|m| m.name == "call"));
1474 assert!(schema.children.is_some());
1475 }
1476
1477 #[tokio::test]
1482 async fn invariant_resolve_handle_unknown_activation() {
1483 use crate::activations::health::Health;
1484 use crate::types::Handle;
1485 use uuid::Uuid;
1486
1487 let hub = DynamicHub::new("test").register(Health::new());
1488
1489 let unknown_plugin_id = Uuid::new_v4();
1491 let handle = Handle::new(unknown_plugin_id, "1.0.0", "some_method");
1492
1493 let result = hub.do_resolve_handle(&handle).await;
1494
1495 match result {
1496 Err(PlexusError::ActivationNotFound(_)) => {
1497 }
1499 Err(other) => panic!("Expected ActivationNotFound, got {:?}", other),
1500 Ok(_) => panic!("Expected error for unknown activation"),
1501 }
1502 }
1503
1504 #[tokio::test]
1505 async fn invariant_resolve_handle_unsupported() {
1506 use crate::activations::health::Health;
1507 use crate::types::Handle;
1508
1509 let hub = DynamicHub::new("test").register(Health::new());
1510
1511 let handle = Handle::new(Health::PLUGIN_ID, "1.0.0", "check");
1513
1514 let result = hub.do_resolve_handle(&handle).await;
1515
1516 match result {
1517 Err(PlexusError::HandleNotSupported(name)) => {
1518 assert_eq!(name, "health");
1519 }
1520 Err(other) => panic!("Expected HandleNotSupported, got {:?}", other),
1521 Ok(_) => panic!("Expected error for unsupported handle"),
1522 }
1523 }
1524
1525 #[tokio::test]
1526 async fn invariant_resolve_handle_routes_by_plugin_id() {
1527 use crate::activations::health::Health;
1528 use crate::activations::echo::Echo;
1529 use crate::types::Handle;
1530 use uuid::Uuid;
1531
1532 let health = Health::new();
1533 let echo = Echo::new();
1534 let health_plugin_id = health.plugin_id();
1535 let echo_plugin_id = echo.plugin_id();
1536
1537 let hub = DynamicHub::new("test")
1538 .register(health)
1539 .register(echo);
1540
1541 let health_handle = Handle::new(health_plugin_id, "1.0.0", "check");
1543 match hub.do_resolve_handle(&health_handle).await {
1544 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "health"),
1545 Err(other) => panic!("health handle should route to health activation, got {:?}", other),
1546 Ok(_) => panic!("health handle should return HandleNotSupported"),
1547 }
1548
1549 let echo_handle = Handle::new(echo_plugin_id, "1.0.0", "echo");
1551 match hub.do_resolve_handle(&echo_handle).await {
1552 Err(PlexusError::HandleNotSupported(name)) => assert_eq!(name, "echo"),
1553 Err(other) => panic!("echo handle should route to echo activation, got {:?}", other),
1554 Ok(_) => panic!("echo handle should return HandleNotSupported"),
1555 }
1556
1557 let unknown_handle = Handle::new(Uuid::new_v4(), "1.0.0", "method");
1559 match hub.do_resolve_handle(&unknown_handle).await {
1560 Err(PlexusError::ActivationNotFound(_)) => { },
1561 Err(other) => panic!("unknown handle should return ActivationNotFound, got {:?}", other),
1562 Ok(_) => panic!("unknown handle should return ActivationNotFound"),
1563 }
1564 }
1565
1566 #[test]
1567 fn invariant_handle_plugin_id_determines_routing() {
1568 use crate::activations::health::Health;
1569 use crate::activations::echo::Echo;
1570 use crate::types::Handle;
1571
1572 let health = Health::new();
1573 let echo = Echo::new();
1574
1575 let health_handle = Handle::new(health.plugin_id(), "1.0.0", "check")
1577 .with_meta(vec!["msg-123".into(), "user".into()]);
1578 let echo_handle = Handle::new(echo.plugin_id(), "1.0.0", "echo")
1579 .with_meta(vec!["msg-123".into(), "user".into()]);
1580
1581 assert_ne!(health_handle.plugin_id, echo_handle.plugin_id);
1583 }
1584
1585 #[test]
1590 fn plugin_registry_basic_operations() {
1591 let mut registry = PluginRegistry::new();
1592 let id = uuid::Uuid::new_v4();
1593
1594 registry.register(id, "test_plugin".to_string(), "test".to_string());
1596
1597 assert_eq!(registry.lookup(id), Some("test_plugin"));
1599
1600 assert_eq!(registry.lookup_by_path("test_plugin"), Some(id));
1602
1603 let entry = registry.get(id).expect("should have entry");
1605 assert_eq!(entry.path, "test_plugin");
1606 assert_eq!(entry.plugin_type, "test");
1607 }
1608
1609 #[test]
1610 fn plugin_registry_populated_on_register() {
1611 use crate::activations::health::Health;
1612
1613 let hub = DynamicHub::new("test").register(Health::new());
1614
1615 let registry = hub.registry();
1616 assert!(!registry.is_empty(), "registry should not be empty after registration");
1617
1618 let health_id = registry.lookup_by_path("health");
1620 assert!(health_id.is_some(), "health should be registered by path");
1621
1622 let health_uuid = health_id.unwrap();
1624 assert_eq!(registry.lookup(health_uuid), Some("health"));
1625 }
1626
1627 #[test]
1628 fn plugin_registry_deterministic_uuid() {
1629 use crate::activations::health::Health;
1630
1631 let health1 = Health::new();
1633 let health2 = Health::new();
1634
1635 assert_eq!(health1.plugin_id(), health2.plugin_id(),
1636 "same activation type should have deterministic UUID");
1637
1638 let expected = uuid::Uuid::new_v5(
1640 &uuid::Uuid::NAMESPACE_OID,
1641 b"health@1"
1642 );
1643 assert_eq!(health1.plugin_id(), expected,
1644 "plugin_id should be deterministic from namespace@major_version");
1645 }
1646
1647 struct MinimalRouter;
1655
1656 #[async_trait]
1657 impl ChildRouter for MinimalRouter {
1658 fn router_namespace(&self) -> &str {
1659 "minimal"
1660 }
1661
1662 async fn router_call(
1663 &self,
1664 _method: &str,
1665 _params: Value,
1666 _auth: Option<&super::super::auth::AuthContext>,
1667 _raw_ctx: Option<&crate::request::RawRequestContext>,
1668 ) -> Result<PlexusStream, PlexusError> {
1669 Err(PlexusError::MethodNotFound {
1670 activation: "minimal".into(),
1671 method: "none".into(),
1672 })
1673 }
1674
1675 async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
1676 None
1677 }
1678 }
1679
1680 #[tokio::test]
1681 async fn child_router_defaults_report_no_capabilities_and_none_streams() {
1682 let router = MinimalRouter;
1683
1684 assert_eq!(
1685 router.capabilities(),
1686 ChildCapabilities::empty(),
1687 "default capabilities should be empty"
1688 );
1689 assert!(
1690 router.list_children().await.is_none(),
1691 "default list_children should be None"
1692 );
1693 assert!(
1694 router.search_children("anything").await.is_none(),
1695 "default search_children should be None"
1696 );
1697 }
1698
1699 struct ListingRouter {
1701 names: Vec<String>,
1702 }
1703
1704 #[async_trait]
1705 impl ChildRouter for ListingRouter {
1706 fn router_namespace(&self) -> &str {
1707 "listing"
1708 }
1709
1710 async fn router_call(
1711 &self,
1712 _method: &str,
1713 _params: Value,
1714 _auth: Option<&super::super::auth::AuthContext>,
1715 _raw_ctx: Option<&crate::request::RawRequestContext>,
1716 ) -> Result<PlexusStream, PlexusError> {
1717 Err(PlexusError::MethodNotFound {
1718 activation: "listing".into(),
1719 method: "none".into(),
1720 })
1721 }
1722
1723 async fn get_child(&self, name: &str) -> Option<Box<dyn ChildRouter>> {
1724 if self.names.iter().any(|n| n == name) {
1725 Some(Box::new(ListingRouter { names: vec![] }))
1728 } else {
1729 None
1730 }
1731 }
1732
1733 fn capabilities(&self) -> ChildCapabilities {
1734 ChildCapabilities::LIST | ChildCapabilities::SEARCH
1735 }
1736
1737 async fn list_children(&self) -> Option<BoxStream<'_, String>> {
1738 let stream = futures::stream::iter(self.names.iter().cloned());
1739 Some(Box::pin(stream))
1740 }
1741
1742 async fn search_children(&self, query: &str) -> Option<BoxStream<'_, String>> {
1743 let q = query.to_string();
1744 let stream = futures::stream::iter(
1745 self.names
1746 .iter()
1747 .filter(move |n| n.contains(&q))
1748 .cloned()
1749 .collect::<Vec<_>>(),
1750 );
1751 Some(Box::pin(stream))
1752 }
1753 }
1754
1755 #[tokio::test]
1756 async fn child_router_overrides_report_capabilities_and_yield_streams() {
1757 use futures::StreamExt;
1758
1759 let router = ListingRouter {
1760 names: vec!["alpha".into(), "beta".into(), "alphabet".into()],
1761 };
1762
1763 let caps = router.capabilities();
1765 assert!(caps.contains(ChildCapabilities::LIST));
1766 assert!(caps.contains(ChildCapabilities::SEARCH));
1767 assert_eq!(caps, ChildCapabilities::LIST | ChildCapabilities::SEARCH);
1768
1769 let list_stream = router
1771 .list_children()
1772 .await
1773 .expect("LIST capability set — expected Some(stream)");
1774 let listed: Vec<String> = list_stream.collect().await;
1775 assert_eq!(listed, vec!["alpha".to_string(), "beta".into(), "alphabet".into()]);
1776
1777 let search_stream = router
1779 .search_children("alpha")
1780 .await
1781 .expect("SEARCH capability set — expected Some(stream)");
1782 let matched: Vec<String> = search_stream.collect().await;
1783 assert_eq!(matched, vec!["alpha".to_string(), "alphabet".into()]);
1784 }
1785}