plexus_core/activations/health/
activation.rs1use super::methods::HealthMethod;
2use super::types::HealthEvent;
3use crate::plexus::{wrap_stream, PlexusError, PlexusStream, Activation, PlexusStreamItem, StreamMetadata, PlexusContext};
4use async_stream::stream;
5use async_trait::async_trait;
6use futures::{Stream, StreamExt};
7use jsonrpsee::core::{server::Methods, SubscriptionResult};
8use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink};
9use serde_json::Value;
10use std::pin::Pin;
11use std::time::Instant;
12
13#[rpc(server, namespace = "health")]
15pub trait HealthRpc {
16 #[subscription(name = "check", unsubscribe = "unsubscribe_check", item = serde_json::Value)]
18 async fn check(&self) -> SubscriptionResult;
19}
20
21#[derive(Clone)]
27pub struct Health {
28 start_time: Instant,
29}
30
31impl Health {
32 pub const NAMESPACE: &'static str = "health";
34 pub const VERSION: &'static str = "1.0.0";
36 pub const PLUGIN_ID: uuid::Uuid = uuid::uuid!("dc560257-b7c5-575b-b893-b448c87ca797");
39
40 pub fn new() -> Self {
41 Self {
42 start_time: Instant::now(),
43 }
44 }
45
46 fn check_stream(
48 &self,
49 ) -> Pin<Box<dyn Stream<Item = HealthEvent> + Send + 'static>> {
50 let uptime = self.start_time.elapsed().as_secs();
51
52 Box::pin(stream! {
53 yield HealthEvent::Status {
54 status: "healthy".to_string(),
55 uptime_seconds: uptime,
56 timestamp: chrono::Utc::now().timestamp(),
57 };
58 })
59 }
60}
61
62impl Default for Health {
63 fn default() -> Self {
64 Self::new()
65 }
66}
67
68#[async_trait]
70impl HealthRpcServer for Health {
71 async fn check(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
72 let sink = pending.accept().await?;
73
74 let stream = self.check_stream();
76 let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
77
78 tokio::spawn(async move {
80 let mut stream = wrapped;
81 while let Some(item) = stream.next().await {
82 if let Ok(raw_value) = serde_json::value::to_raw_value(&item) {
83 if sink.send(raw_value).await.is_err() {
84 break;
85 }
86 }
87 }
88 let done = PlexusStreamItem::Done {
90 metadata: StreamMetadata::new(vec!["health".into()], PlexusContext::hash()),
91 };
92 if let Ok(raw_value) = serde_json::value::to_raw_value(&done) {
93 let _ = sink.send(raw_value).await;
94 }
95 });
96
97 Ok(())
98 }
99}
100
101#[async_trait]
103impl Activation for Health {
104 type Methods = HealthMethod;
105
106 fn namespace(&self) -> &str {
107 "health"
108 }
109
110 fn version(&self) -> &str {
111 "1.0.0"
112 }
113
114 fn plugin_id(&self) -> uuid::Uuid {
115 Self::PLUGIN_ID
116 }
117
118 fn description(&self) -> &str {
119 "Check hub health and uptime"
120 }
121
122 fn methods(&self) -> Vec<&str> {
123 vec!["check", "schema"]
124 }
125
126 fn method_help(&self, method: &str) -> Option<String> {
127 match method {
128 "schema" => Some("Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.".to_string()),
129 _ => HealthMethod::description(method).map(|s| s.to_string()),
130 }
131 }
132
133 async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
134 match method {
135 "check" => {
136 let stream = self.check_stream();
137 Ok(wrap_stream(stream, "health.status", vec!["health".into()]))
138 }
139 "schema" => {
140 use crate::plexus::SchemaResult;
141
142 let method_name: Option<String> = params.get("method")
144 .and_then(|v| v.as_str())
145 .map(|s| s.to_string());
146
147 let plugin_schema = self.plugin_schema();
148
149 let result = if let Some(ref name) = method_name {
150 plugin_schema.methods.iter()
152 .find(|m| m.name == *name)
153 .map(|m| SchemaResult::Method(m.clone()))
154 .ok_or_else(|| PlexusError::MethodNotFound {
155 activation: "health".to_string(),
156 method: name.clone(),
157 })?
158 } else {
159 SchemaResult::Plugin(plugin_schema)
161 };
162
163 Ok(wrap_stream(
164 futures::stream::once(async move { result }),
165 "health.schema",
166 vec!["health".into()]
167 ))
168 }
169 _ => {
170 if let Some(method_name) = method.strip_suffix(".schema") {
173 use crate::plexus::SchemaResult;
174
175 let plugin_schema = self.plugin_schema();
176 if let Some(m) = plugin_schema.methods.iter().find(|m| m.name == method_name) {
177 let result = SchemaResult::Method(m.clone());
178 return Ok(wrap_stream(
179 futures::stream::once(async move { result }),
180 "health.method_schema",
181 vec!["health".into()]
182 ));
183 }
184 }
185
186 Err(PlexusError::MethodNotFound {
187 activation: "health".to_string(),
188 method: method.to_string(),
189 })
190 }
191 }
192 }
193
194 fn into_rpc_methods(self) -> Methods {
195 self.into_rpc().into()
197 }
198}