plexus_substrate/activations/health/
activation.rs1use super::methods::HealthMethod;
2use super::types::HealthEvent;
3use crate::plexus::{wrap_stream, PlexusError, PlexusStream, Activation, PlexusStreamItem, StreamMetadata, PlexusContext, MethodSchema, PluginSchema, SchemaResult};
4use async_stream::stream;
5use async_trait::async_trait;
6use futures::{Stream, StreamExt};
7use jsonrpsee::core::{server::Methods, SubscriptionResult};
8use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink};
9use serde_json::Value;
10use std::pin::Pin;
11use std::time::Instant;
12
13#[rpc(server, namespace = "health")]
15pub trait HealthRpc {
16 #[subscription(name = "check", unsubscribe = "unsubscribe_check", item = serde_json::Value)]
18 async fn check(&self) -> SubscriptionResult;
19}
20
21#[derive(Clone)]
27pub struct Health {
28 start_time: Instant,
29}
30
31impl Health {
32 pub const NAMESPACE: &'static str = "health";
34 pub const VERSION: &'static str = "1.0.0";
36 pub const PLUGIN_ID: uuid::Uuid = uuid::uuid!("dc560257-b7c5-575b-b893-b448c87ca797");
39
40 pub fn new() -> Self {
41 Self {
42 start_time: Instant::now(),
43 }
44 }
45
46 fn check_stream(
48 &self,
49 ) -> Pin<Box<dyn Stream<Item = HealthEvent> + Send + 'static>> {
50 let uptime = self.start_time.elapsed().as_secs();
51
52 Box::pin(stream! {
53 yield HealthEvent::Status {
54 status: "healthy".to_string(),
55 uptime_seconds: uptime,
56 timestamp: chrono::Utc::now().timestamp(),
57 };
58 })
59 }
60}
61
62impl Default for Health {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68#[async_trait]
70impl HealthRpcServer for Health {
71 async fn check(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
72 let sink = pending.accept().await?;
73
74 let stream = self.check_stream();
76 let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
77
78 tokio::spawn(async move {
80 let mut stream = wrapped;
81 while let Some(item) = stream.next().await {
82 if let Ok(raw_value) = serde_json::value::to_raw_value(&item) {
83 if sink.send(raw_value).await.is_err() {
84 break;
85 }
86 }
87 }
88 let done = PlexusStreamItem::Done {
90 metadata: StreamMetadata::new(vec!["health".into()], PlexusContext::hash()),
91 };
92 if let Ok(raw_value) = serde_json::value::to_raw_value(&done) {
93 let _ = sink.send(raw_value).await;
94 }
95 });
96
97 Ok(())
98 }
99}
100
101#[async_trait]
103impl Activation for Health {
104 type Methods = HealthMethod;
105
106 fn namespace(&self) -> &str {
107 "health"
108 }
109
110 fn version(&self) -> &str {
111 "1.0.0"
112 }
113
114 fn plugin_id(&self) -> uuid::Uuid {
115 Self::PLUGIN_ID
116 }
117
118 fn description(&self) -> &str {
119 "Check hub health and uptime"
120 }
121
122 fn methods(&self) -> Vec<&str> {
123 vec!["check", "schema"]
124 }
125
126 fn method_help(&self, method: &str) -> Option<String> {
127 match method {
128 "schema" => Some("Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.".to_string()),
129 _ => HealthMethod::description(method).map(|s| s.to_string()),
130 }
131 }
132
133 async fn call(&self, method: &str, params: Value, _auth: Option<&plexus_core::plexus::AuthContext>, _raw_ctx: Option<&plexus_core::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> { match method {
134 "check" => {
135 let stream = self.check_stream();
136 Ok(wrap_stream(stream, "health.status", vec!["health".into()]))
137 }
138 "schema" => {
139 use crate::plexus::SchemaResult;
140
141 let method_name: Option<String> = params.get("method")
143 .and_then(|v| v.as_str())
144 .map(|s| s.to_string());
145
146 let plugin_schema = self.plugin_schema();
147
148 let result = if let Some(ref name) = method_name {
149 plugin_schema.methods.iter()
151 .find(|m| m.name == *name)
152 .map(|m| SchemaResult::Method(m.clone()))
153 .ok_or_else(|| PlexusError::MethodNotFound {
154 activation: "health".to_string(),
155 method: name.clone(),
156 })?
157 } else {
158 SchemaResult::Plugin(plugin_schema)
160 };
161
162 Ok(wrap_stream(
163 futures::stream::once(async move { result }),
164 "health.schema",
165 vec!["health".into()]
166 ))
167 }
168 _ => {
169 if let Some(method_name) = method.strip_suffix(".schema") {
172 use crate::plexus::SchemaResult;
173
174 let plugin_schema = self.plugin_schema();
175 if let Some(m) = plugin_schema.methods.iter().find(|m| m.name == method_name) {
176 let result = SchemaResult::Method(m.clone());
177 return Ok(wrap_stream(
178 futures::stream::once(async move { result }),
179 "health.method_schema",
180 vec!["health".into()]
181 ));
182 }
183 }
184
185 Err(PlexusError::MethodNotFound {
186 activation: "health".to_string(),
187 method: method.to_string(),
188 })
189 }
190 } }
191
192 fn into_rpc_methods(self) -> Methods {
193 self.into_rpc().into()
195 }
196
197 fn plugin_schema(&self) -> PluginSchema {
198 use std::collections::hash_map::DefaultHasher;
199 use std::hash::{Hash, Hasher};
200
201 let check_desc = "Check the health status of the hub and return uptime";
203 let mut hasher = DefaultHasher::new();
204 "check".hash(&mut hasher);
205 check_desc.hash(&mut hasher);
206 let check_hash = format!("{:016x}", hasher.finish());
207
208 let schema_desc = "Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.";
210 let mut hasher = DefaultHasher::new();
211 "schema".hash(&mut hasher);
212 schema_desc.hash(&mut hasher);
213 let schema_hash = format!("{:016x}", hasher.finish());
214
215 let methods = vec![
216 MethodSchema::new("check", check_desc, check_hash)
217 .with_returns(schemars::schema_for!(HealthEvent)),
218 MethodSchema::new("schema", schema_desc, schema_hash)
219 .with_returns(schemars::schema_for!(SchemaResult)),
220 ];
221
222 PluginSchema::leaf(self.namespace(), self.version(), self.description(), methods)
223 }
224}
225
226#[async_trait]
227impl plexus_core::plexus::ChildRouter for Health {
228 fn router_namespace(&self) -> &str {
229 "health"
230 }
231
232 async fn router_call(&self, method: &str, params: Value, auth: Option<&plexus_core::plexus::AuthContext>, raw_ctx: Option<&plexus_core::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
233 Activation::call(self, method, params, auth, raw_ctx).await
234 }
235
236 async fn get_child(&self, _name: &str) -> Option<Box<dyn plexus_core::plexus::ChildRouter>> {
237 None
238 }
239}