Skip to main content

plexus_substrate/activations/health/
activation.rs

1use super::methods::HealthMethod;
2use super::types::HealthEvent;
3use crate::plexus::{wrap_stream, PlexusError, PlexusStream, Activation, PlexusStreamItem, StreamMetadata, PlexusContext, MethodSchema, PluginSchema, SchemaResult};
4use async_stream::stream;
5use async_trait::async_trait;
6use futures::{Stream, StreamExt};
7use jsonrpsee::core::{server::Methods, SubscriptionResult};
8use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink};
9use serde_json::Value;
10use std::pin::Pin;
11use std::time::Instant;
12
13/// Health RPC interface
14#[rpc(server, namespace = "health")]
15pub trait HealthRpc {
16    /// Check health status (streaming subscription)
17    #[subscription(name = "check", unsubscribe = "unsubscribe_check", item = serde_json::Value)]
18    async fn check(&self) -> SubscriptionResult;
19}
20
21/// Health activation - minimal reference implementation
22///
23/// This activation demonstrates the caller-wraps architecture.
24/// The `check_stream` method returns typed domain events (HealthEvent),
25/// and the `call` method wraps them using `wrap_stream`.
26#[derive(Clone)]
27pub struct Health {
28    start_time: Instant,
29}
30
31impl Health {
32    /// Namespace for the health plugin
33    pub const NAMESPACE: &'static str = "health";
34    /// Version of the health plugin
35    pub const VERSION: &'static str = "1.0.0";
36    /// Stable plugin instance ID for handle routing (same formula as hub_methods macro)
37    /// Generated from "health@1" (namespace + major version)
38    pub const PLUGIN_ID: uuid::Uuid = uuid::uuid!("dc560257-b7c5-575b-b893-b448c87ca797");
39
40    pub fn new() -> Self {
41        Self {
42            start_time: Instant::now(),
43        }
44    }
45
46    /// Returns typed stream - caller will wrap with metadata
47    fn check_stream(
48        &self,
49    ) -> Pin<Box<dyn Stream<Item = HealthEvent> + Send + 'static>> {
50        let uptime = self.start_time.elapsed().as_secs();
51
52        Box::pin(stream! {
53            yield HealthEvent::Status {
54                status: "healthy".to_string(),
55                uptime_seconds: uptime,
56                timestamp: chrono::Utc::now().timestamp(),
57            };
58        })
59    }
60}
61
62impl Default for Health {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68/// RPC server implementation
69#[async_trait]
70impl HealthRpcServer for Health {
71    async fn check(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
72        let sink = pending.accept().await?;
73
74        // Get wrapped stream
75        let stream = self.check_stream();
76        let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
77
78        // Forward all items to sink
79        tokio::spawn(async move {
80            let mut stream = wrapped;
81            while let Some(item) = stream.next().await {
82                if let Ok(raw_value) = serde_json::value::to_raw_value(&item) {
83                    if sink.send(raw_value).await.is_err() {
84                        break;
85                    }
86                }
87            }
88            // Send done event
89            let done = PlexusStreamItem::Done {
90                metadata: StreamMetadata::new(vec!["health".into()], PlexusContext::hash()),
91            };
92            if let Ok(raw_value) = serde_json::value::to_raw_value(&done) {
93                let _ = sink.send(raw_value).await;
94            }
95        });
96
97        Ok(())
98    }
99}
100
101/// Activation trait implementation - unified interface for Plexus
102#[async_trait]
103impl Activation for Health {
104    type Methods = HealthMethod;
105
106    fn namespace(&self) -> &str {
107        "health"
108    }
109
110    fn version(&self) -> &str {
111        "1.0.0"
112    }
113
114    fn plugin_id(&self) -> uuid::Uuid {
115        Self::PLUGIN_ID
116    }
117
118    fn description(&self) -> &str {
119        "Check hub health and uptime"
120    }
121
122    fn methods(&self) -> Vec<&str> {
123        vec!["check", "schema"]
124    }
125
126    fn method_help(&self, method: &str) -> Option<String> {
127        match method {
128            "schema" => Some("Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.".to_string()),
129            _ => HealthMethod::description(method).map(|s| s.to_string()),
130        }
131    }
132
133    async fn call(&self, method: &str, params: Value, _auth: Option<&plexus_core::plexus::AuthContext>, _raw_ctx: Option<&plexus_core::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> { match method {
134        "check" => {
135            let stream = self.check_stream();
136            Ok(wrap_stream(stream, "health.status", vec!["health".into()]))
137        }
138        "schema" => {
139            use crate::plexus::SchemaResult;
140    
141            // Check if a specific method was requested
142            let method_name: Option<String> = params.get("method")
143                .and_then(|v| v.as_str())
144                .map(|s| s.to_string());
145    
146            let plugin_schema = self.plugin_schema();
147    
148            let result = if let Some(ref name) = method_name {
149                // Find the specific method
150                plugin_schema.methods.iter()
151                    .find(|m| m.name == *name)
152                    .map(|m| SchemaResult::Method(m.clone()))
153                    .ok_or_else(|| PlexusError::MethodNotFound {
154                        activation: "health".to_string(),
155                        method: name.clone(),
156                    })?
157            } else {
158                // Return full plugin schema
159                SchemaResult::Plugin(plugin_schema)
160            };
161    
162            Ok(wrap_stream(
163                futures::stream::once(async move { result }),
164                "health.schema",
165                vec!["health".into()]
166            ))
167        }
168        _ => {
169            // Check for {method}.schema pattern (e.g., "check.schema")
170            // Only if the prefix is an actual local method
171            if let Some(method_name) = method.strip_suffix(".schema") {
172                use crate::plexus::SchemaResult;
173    
174                let plugin_schema = self.plugin_schema();
175                if let Some(m) = plugin_schema.methods.iter().find(|m| m.name == method_name) {
176                    let result = SchemaResult::Method(m.clone());
177                    return Ok(wrap_stream(
178                        futures::stream::once(async move { result }),
179                        "health.method_schema",
180                        vec!["health".into()]
181                    ));
182                }
183            }
184    
185            Err(PlexusError::MethodNotFound {
186                activation: "health".to_string(),
187                method: method.to_string(),
188            })
189        }
190    } }
191
192    fn into_rpc_methods(self) -> Methods {
193        // Register RPC subscription methods
194        self.into_rpc().into()
195    }
196
197    fn plugin_schema(&self) -> PluginSchema {
198        use std::collections::hash_map::DefaultHasher;
199        use std::hash::{Hash, Hasher};
200
201        // check method
202        let check_desc = "Check the health status of the hub and return uptime";
203        let mut hasher = DefaultHasher::new();
204        "check".hash(&mut hasher);
205        check_desc.hash(&mut hasher);
206        let check_hash = format!("{:016x}", hasher.finish());
207
208        // schema method
209        let schema_desc = "Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.";
210        let mut hasher = DefaultHasher::new();
211        "schema".hash(&mut hasher);
212        schema_desc.hash(&mut hasher);
213        let schema_hash = format!("{:016x}", hasher.finish());
214
215        let methods = vec![
216            MethodSchema::new("check", check_desc, check_hash)
217                .with_returns(schemars::schema_for!(HealthEvent)),
218            MethodSchema::new("schema", schema_desc, schema_hash)
219                .with_returns(schemars::schema_for!(SchemaResult)),
220        ];
221
222        PluginSchema::leaf(self.namespace(), self.version(), self.description(), methods)
223    }
224}
225
226#[async_trait]
227impl plexus_core::plexus::ChildRouter for Health {
228    fn router_namespace(&self) -> &str {
229        "health"
230    }
231
232    async fn router_call(&self, method: &str, params: Value, auth: Option<&plexus_core::plexus::AuthContext>, raw_ctx: Option<&plexus_core::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
233        Activation::call(self, method, params, auth, raw_ctx).await
234    }
235
236    async fn get_child(&self, _name: &str) -> Option<Box<dyn plexus_core::plexus::ChildRouter>> {
237        None
238    }
239}