Skip to main content

plexus_substrate/activations/health/
activation.rs

1use super::methods::HealthMethod;
2use super::types::HealthEvent;
3use crate::plexus::{wrap_stream, PlexusError, PlexusStream, Activation, PlexusStreamItem, StreamMetadata, PlexusContext, MethodSchema, PluginSchema, SchemaResult};
4use async_stream::stream;
5use async_trait::async_trait;
6use futures::{Stream, StreamExt};
7use jsonrpsee::core::{server::Methods, SubscriptionResult};
8use jsonrpsee::{proc_macros::rpc, PendingSubscriptionSink};
9use serde_json::Value;
10use std::pin::Pin;
11use std::time::Instant;
12
13/// Health RPC interface
14#[rpc(server, namespace = "health")]
15pub trait HealthRpc {
16    /// Check health status (streaming subscription)
17    #[subscription(name = "check", unsubscribe = "unsubscribe_check", item = serde_json::Value)]
18    async fn check(&self) -> SubscriptionResult;
19}
20
21/// Health activation - minimal reference implementation
22///
23/// This activation demonstrates the caller-wraps architecture.
24/// The `check_stream` method returns typed domain events (HealthEvent),
25/// and the `call` method wraps them using `wrap_stream`.
26#[derive(Clone)]
27pub struct Health {
28    start_time: Instant,
29}
30
31impl Health {
32    /// Namespace for the health plugin
33    pub const NAMESPACE: &'static str = "health";
34    /// Version of the health plugin
35    pub const VERSION: &'static str = "1.0.0";
36    /// Stable plugin instance ID for handle routing (same formula as hub_methods macro)
37    /// Generated from "health@1" (namespace + major version)
38    pub const PLUGIN_ID: uuid::Uuid = uuid::uuid!("dc560257-b7c5-575b-b893-b448c87ca797");
39
40    pub fn new() -> Self {
41        Self {
42            start_time: Instant::now(),
43        }
44    }
45
46    /// Returns typed stream - caller will wrap with metadata
47    fn check_stream(
48        &self,
49    ) -> Pin<Box<dyn Stream<Item = HealthEvent> + Send + 'static>> {
50        let uptime = self.start_time.elapsed().as_secs();
51
52        Box::pin(stream! {
53            yield HealthEvent::Status {
54                status: "healthy".to_string(),
55                uptime_seconds: uptime,
56                timestamp: chrono::Utc::now().timestamp(),
57            };
58        })
59    }
60}
61
62impl Default for Health {
63    fn default() -> Self {
64        Self::new()
65    }
66}
67
68/// RPC server implementation
69#[async_trait]
70impl HealthRpcServer for Health {
71    async fn check(&self, pending: PendingSubscriptionSink) -> SubscriptionResult {
72        let sink = pending.accept().await?;
73
74        // Get wrapped stream
75        let stream = self.check_stream();
76        let wrapped = wrap_stream(stream, "health.status", vec!["health".into()]);
77
78        // Forward all items to sink
79        tokio::spawn(async move {
80            let mut stream = wrapped;
81            while let Some(item) = stream.next().await {
82                if let Ok(raw_value) = serde_json::value::to_raw_value(&item) {
83                    if sink.send(raw_value).await.is_err() {
84                        break;
85                    }
86                }
87            }
88            // Send done event
89            let done = PlexusStreamItem::Done {
90                metadata: StreamMetadata::new(vec!["health".into()], PlexusContext::hash()),
91            };
92            if let Ok(raw_value) = serde_json::value::to_raw_value(&done) {
93                let _ = sink.send(raw_value).await;
94            }
95        });
96
97        Ok(())
98    }
99}
100
101/// Activation trait implementation - unified interface for Plexus
102#[async_trait]
103impl Activation for Health {
104    type Methods = HealthMethod;
105
106    fn namespace(&self) -> &str {
107        "health"
108    }
109
110    fn version(&self) -> &str {
111        "1.0.0"
112    }
113
114    fn plugin_id(&self) -> uuid::Uuid {
115        Self::PLUGIN_ID
116    }
117
118    fn description(&self) -> &str {
119        "Check hub health and uptime"
120    }
121
122    fn methods(&self) -> Vec<&str> {
123        vec!["check", "schema"]
124    }
125
126    fn method_help(&self, method: &str) -> Option<String> {
127        match method {
128            "schema" => Some("Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.".to_string()),
129            _ => HealthMethod::description(method).map(|s| s.to_string()),
130        }
131    }
132
133    async fn call(&self, method: &str, params: Value) -> Result<PlexusStream, PlexusError> {
134        match method {
135            "check" => {
136                let stream = self.check_stream();
137                Ok(wrap_stream(stream, "health.status", vec!["health".into()]))
138            }
139            "schema" => {
140                use crate::plexus::SchemaResult;
141
142                // Check if a specific method was requested
143                let method_name: Option<String> = params.get("method")
144                    .and_then(|v| v.as_str())
145                    .map(|s| s.to_string());
146
147                let plugin_schema = self.plugin_schema();
148
149                let result = if let Some(ref name) = method_name {
150                    // Find the specific method
151                    plugin_schema.methods.iter()
152                        .find(|m| m.name == *name)
153                        .map(|m| SchemaResult::Method(m.clone()))
154                        .ok_or_else(|| PlexusError::MethodNotFound {
155                            activation: "health".to_string(),
156                            method: name.clone(),
157                        })?
158                } else {
159                    // Return full plugin schema
160                    SchemaResult::Plugin(plugin_schema)
161                };
162
163                Ok(wrap_stream(
164                    futures::stream::once(async move { result }),
165                    "health.schema",
166                    vec!["health".into()]
167                ))
168            }
169            _ => {
170                // Check for {method}.schema pattern (e.g., "check.schema")
171                // Only if the prefix is an actual local method
172                if let Some(method_name) = method.strip_suffix(".schema") {
173                    use crate::plexus::SchemaResult;
174
175                    let plugin_schema = self.plugin_schema();
176                    if let Some(m) = plugin_schema.methods.iter().find(|m| m.name == method_name) {
177                        let result = SchemaResult::Method(m.clone());
178                        return Ok(wrap_stream(
179                            futures::stream::once(async move { result }),
180                            "health.method_schema",
181                            vec!["health".into()]
182                        ));
183                    }
184                }
185
186                Err(PlexusError::MethodNotFound {
187                    activation: "health".to_string(),
188                    method: method.to_string(),
189                })
190            }
191        }
192    }
193
194    fn into_rpc_methods(self) -> Methods {
195        // Register RPC subscription methods
196        self.into_rpc().into()
197    }
198
199    fn plugin_schema(&self) -> PluginSchema {
200        use std::collections::hash_map::DefaultHasher;
201        use std::hash::{Hash, Hasher};
202
203        // check method
204        let check_desc = "Check the health status of the hub and return uptime";
205        let mut hasher = DefaultHasher::new();
206        "check".hash(&mut hasher);
207        check_desc.hash(&mut hasher);
208        let check_hash = format!("{:016x}", hasher.finish());
209
210        // schema method
211        let schema_desc = "Get plugin or method schema. Pass {\"method\": \"name\"} for a specific method.";
212        let mut hasher = DefaultHasher::new();
213        "schema".hash(&mut hasher);
214        schema_desc.hash(&mut hasher);
215        let schema_hash = format!("{:016x}", hasher.finish());
216
217        let methods = vec![
218            MethodSchema::new("check", check_desc, check_hash)
219                .with_returns(schemars::schema_for!(HealthEvent)),
220            MethodSchema::new("schema", schema_desc, schema_hash)
221                .with_returns(schemars::schema_for!(SchemaResult)),
222        ];
223
224        PluginSchema::leaf(self.namespace(), self.version(), self.description(), methods)
225    }
226}