use super::methods::HealthMethod;
use super::types::HealthEvent;
use crate::plexus::{wrap_stream, PlexusError, PlexusStream, Activation, ChildRouter, PlexusStreamItem, StreamMetadata, PlexusContext};
use async_stream::stream;
use async_trait::async_trait;
use futures::{Stream, StreamExt};
use jsonrpsee::core::{server::Methods, SubscriptionResult};
use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink};
use serde_json::Value;
use std::pin::Pin;
use std::time::Instant;
#[rpc(server, namespace = "health")]
pub trait HealthRpc {
#[subscription(name = "check", unsubscribe = "unsubscribe_check", item = serde_json::Value)]
async fn check(&self) -> SubscriptionResult;
}
#[derive(Clone)]
pub struct Health {
start_time: Instant,
}
impl Health {
pub const NAMESPACE: &'static str = "health";
pub const VERSION: &'static str = "1.0.0";
pub const PLUGIN_ID: uuid::Uuid = uuid::uuid!("dc560257-b7c5-575b-b893-b448c87ca797");
pub fn new() -> Self {
Self {
start_time: Instant::now(),
}
}
fn check_stream(
&self,
) -> Pin<Box<dyn Stream<Item = HealthEvent> + Send + 'static>> {
let uptime = self.start_time.elapsed().as_secs();
Box::pin(stream! {
yield HealthEvent::Status {
status: "healthy".to_string(),
uptime_seconds: uptime,
timestamp: chrono::Utc::now().timestamp(),
};
})
}
}
impl Default for Health {
fn default() -> Self {
Self::new()
}
}
#[async_trait]
impl HealthRpcServer for Health {
async fn check(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
let sink = pending.accept().await?;
let stream = self.check_stream();
let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
tokio::spawn(async move {
let mut stream = wrapped;
while let Some(item) = stream.next().await {
if let Ok(raw_value) = serde_json::value::to_raw_value(&item) {
if sink.send(raw_value).await.is_err() {
break;
}
}
}
let done = PlexusStreamItem::Done {
metadata: StreamMetadata::new(vec!["health".into()], PlexusContext::hash()),
};
if let Ok(raw_value) = serde_json::value::to_raw_value(&done) {
let _ = sink.send(raw_value).await;
}
});
Ok(())
}
}
#[async_trait]
impl Activation for Health {
type Methods = HealthMethod;
fn namespace(&self) -> &str {
"health"
}
fn version(&self) -> &str {
"1.0.0"
}
fn plugin_id(&self) -> uuid::Uuid {
Self::PLUGIN_ID
}
fn description(&self) -> &str {
"Check hub health and uptime"
}
fn methods(&self) -> Vec<&str> {
vec!["check", "schema"]
}
fn method_help(&self, method: &str) -> Option<String> {
match method {
"schema" => Some("Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.".to_string()),
_ => HealthMethod::description(method).map(|s| s.to_string()),
}
}
async fn call(&self, method: &str, params: Value, _auth: Option<&crate::plexus::AuthContext>, _raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
match method {
"check" => {
let stream = self.check_stream();
Ok(wrap_stream(stream, "health.status", vec!["health".into()]))
}
"schema" => {
use crate::plexus::SchemaResult;
let method_name: Option<String> = params.get("method")
.and_then(|v| v.as_str())
.map(|s| s.to_string());
let plugin_schema = self.plugin_schema();
let result = if let Some(ref name) = method_name {
plugin_schema.methods.iter()
.find(|m| m.name == *name)
.map(|m| SchemaResult::Method(m.clone()))
.ok_or_else(|| PlexusError::MethodNotFound {
activation: "health".to_string(),
method: name.clone(),
})?
} else {
SchemaResult::Plugin(plugin_schema)
};
Ok(wrap_stream(
futures::stream::once(async move { result }),
"health.schema",
vec!["health".into()]
))
}
_ => {
if let Some(method_name) = method.strip_suffix(".schema") {
use crate::plexus::SchemaResult;
let plugin_schema = self.plugin_schema();
if let Some(m) = plugin_schema.methods.iter().find(|m| m.name == method_name) {
let result = SchemaResult::Method(m.clone());
return Ok(wrap_stream(
futures::stream::once(async move { result }),
"health.method_schema",
vec!["health".into()]
));
}
}
Err(PlexusError::MethodNotFound {
activation: "health".to_string(),
method: method.to_string(),
})
}
}
}
fn into_rpc_methods(self) -> Methods {
self.into_rpc().into()
}
}
#[async_trait]
impl ChildRouter for Health {
fn router_namespace(&self) -> &str {
"health"
}
async fn router_call(&self, method: &str, params: Value, auth: Option<&crate::plexus::AuthContext>, raw_ctx: Option<&crate::request::RawRequestContext>) -> Result<PlexusStream, PlexusError> {
self.call(method, params, auth, raw_ctx).await
}
async fn get_child(&self, _name: &str) -> Option<Box<dyn ChildRouter>> {
None
}
}